私はalpakkaを使用して複数のjmsSource(異なるキュー用)を開始するシナリオを持っています。私はまた、キューをいつでも切り離す必要があります。そこで、以下のようにjms akkaストリームにKillSwitchを追加しました: -Akkaはalpakka jmsのKillSwitchをストリームします
trait MessageListener {
lazy val jmsPipeline = jmsSource
.map { x => log.info(s"Received message ${x} from ${queue}"); x }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
(Keep.both)
.run()
def start(): Unit = {
log.info("Invoking listener : {}", queue)
jmsPipeline
log.info("listener : {} started", queue)
}
def stop():Unit = jmsPipeline._1.shutdown()
def queue: String
}
object ListenerA extends MessageListener {
override def queue: String = "Queue_A"
}
object ListenerB extends MessageListener {
override def queue: String = "Queue_B"
}
などと同様です。
アプリケーションを起動すると、すべてのキューが接続され正常に動作します。しかし、私がstopメソッドを使ってキューをデタッチしようとすると、すべてのキューが切断されず、動作がランダムになるわけではありません。また、killSwitchがすべてのリスナーで異なることも確認しました。
ここで何が間違っているのか教えてください。