2017-12-19 20 views
1

私はこの二つのシンクなどのストリームを持っているが、一方のみが一度に使用されます。複数のシンクが

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink1) 

または

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink2) 

それは設定可能です我々が使用沈みます両方のシンクを並行して使用するとどうなりますか? どうすればいいですか?

私はSink.combineについて考えましたが、マージ戦略も必要であり、これらのシンクの結果をどのように組み合わせても構いません。私は本当に気にしないので、同じデータをHTTP経由でいくつかのエンドポイントに送信し、同時にそれらをデータベースに送信したいと思います。 シンクの組み合わせはブロードキャストと非常によく似ていますが、スクラッチからブロードキャストを実装するとコードの可読性が低下します。シンプルなソース、フロー、シンクしかなく、低レベルのグラフステージはありません。

あなたはそれを行うための適切な方法を知っていますか(背圧と、1つのシンクだけを使用している他のもの)?

答えて

4

あなたは(API docsを参照)alsoToを使用することができます。

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore) 
+0

2番目のシンクの前にシンプルな.asyncを追加すると、これらのシンクを並行して実行できますか? 私はそれらを並行して実行したいと思いますが、背圧が残っています。つまり、すべてのシンクで費やされた時間の合計ではなく、最も遅いシンクで過ごす時間と同じ速さでストリームを実行したいそれらは同期して実行されるため)。 –

3

放送は可読性を低下させてはならない最も単純な形式でGraphDSLを使用して - 実際には、1にも何らかの形で~>節が視覚化することを主張するかもしれませんストリーム構造:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val bcast = builder.add(Broadcast[Int](2)) 

    Source.fromElements(1, 2, 3) ~> flow ~> bcast.in 
    bcast.out(0) ~> sink1 
    bcast.out(1) ~> sink2 

    ClosedShape 
}) 
graph.run() 
+0

これらのシンクは並行して動作しますか? –

+0

デフォルトでは、Akka Streamsはグラフ処理ステージを順番に実行しますが、必要に応じて、 'async'メソッドを使用して並列実行できます。詳細は、このトピックの[Akka Stream doc](https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html?language=scala)を参照してください。 –

関連する問題