2017-07-26 18 views
1

私はRunnableGraphのように次のようになります。 とmergeステージの間に単純なmapがある場合、すべてが問題ありません。ただし、mapConcatになると、このコードは最初の要素を消費した後には機能しません。Akka-stream mapConcatが循環RunnableGraphで動作しない

なぜ動作しないのか知りたい。

RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
import GraphDSL.Implicits._ 

val M = b.add(MergePreferred[Int](1)) 
val B = b.add(Broadcast[Int](2)) 
val S = Source(List(3)) 

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore 
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B 
ClosedShape 
}) 

// run() output: 
// 3 
// List(2,2,2) 

答えて

0

mapConcatステージがフィードバックループをブロックし、これが予想されます。要求が来なければならない

  • (2、2、2)mapConcatステージ3つの利用可能な要素の最初のを放出する要求を必要

    1. mapConcat機能プリントList(2,2,2)
    2. :以下のイベントのチェーンを考えますマージステージから、したがってブロードキャストステージから取得します。
    3. ブロードキャストステージは、いずれかのダウンストリームバックプレッシャがある場合にバックプレッシャを行います。その下流にはSink.ignore(背圧がかからない)とmapConcatがあります。
    4. mapConcatのバックプレッシャ「docsによると、以前に計算されたコレクションに残っている要素が残っています。これは事実です。

    つまり、サイクルがアンバランスです。フィードバックループでは、削除するよりも多くの要素を導入しています。

    この問題は、this documentation pageで詳細に説明されています。この解決策のいくつかについても説明します。具体的なケースでは、フィルタの段階があるため、より大きいバッファを導入すると、すべての要素が印刷されます。ただし、グラフはハングして後で完了しないことに注意してください。

    S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore 
    M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B 
    
  • +0

    本当に有益です!あなたの説明の後、オーバーフロー時にメッセージを失いたくないので、私は 'conflateWithSeed'が自分の状況に適していることを知ります。 –

    関連する問題