2017-11-27 1 views
0

を上書き:reactiveConnectionserviceRabbitConnectionアッカストリーム私は、次のグラフを持っているKillSwitch

case class FlowFactory() { 

    val reactiveConnection = ??? 
    val serviceRabbitConnection = ??? 

    val switch = KillSwitches.single[Routed] 

    val stream: RunnableGraph[UniqueKillSwitch] = RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw => 
    import GraphDSL.Implicits._ 

    val in = builder.add(Source.fromPublisher(reactiveConnection.consume(???))) 
    val context = builder.add(contextFlow(serviceRabbitConnection)) 
    val inflate = builder.add(inflateFlow()) 
    val compute = builder.add(computeFlow()) 
    val out = builder.add(Sink.fromSubscriber(reactiveConnection.publish())) 

    in ~> context ~> inflate ~> compute ~> sw ~> out 

    ClosedShape 
    }) 

    val killSwitch = stream.run() 

    killSwitch.shutdown() 

} 

とき、私はシャットダウンストリームを、私はまた、次の接続を殺すために必要があります。 どうすれば実現できますか?KillSwitchshutdown()メソッドをオーバーライドする簡単な方法はありますか? onComplete()またはonClose()のように、ストリームが閉じられたときに呼び出されるメソッドはありますか?

答えて

2

コールバックをストリームに追加するには、シンク(Sink.onComplete)を追加してください。

val sink1 = Sink.fromSubscriber(reactiveConnection.publish()) 
    val sink2 = Sink.onComplete{ 
    case Success(_) ⇒ println("success!") 
    case Failure(e) ⇒ println(s"failure - $e") 
    } 

    val out = builder.add(Sink.combine(sink1, sink2)(Broadcast(_))) 
関連する問題