2016-08-31 14 views
2

akka StreamsのActorRefSourceを使用してアイテムのシーケンスを構築したいとします。前記ソースにはデータが連続的に供給される。計算が完了した後、ストリームはPoison Pillで終了します。Akka Streams ActorRefSourceメッセージの順序

次簡略化した例は、私の意思を示しています。

val source = Source.actorRef[Int](1000, OverflowStrategy.fail) 
    .mapMaterializedValue{ ref => 
     for(i <- 1 to 1000) { 
     ref ! i 
     } 

     ref ! PoisonPill 
    } 

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size)) 

を私はすべて1000個の要素を処理するストリームを期待して、受信されているポイズンピルのために終了しました。残念ながら、ストリームは通常、ずっと早く終了します。出力例:

count: 24 

ポイズンピルを送信する前にしばらく待ってください。 1000ミリ秒になると、すべての数値が処理されます。

ポイズンピルを受け取る前にすべてのアイテムが処理されていることを確認する方法については、非常に感謝します。

答えて

2

the documentation for Source.actorRefを参照してください。PoisonPillはストリームを終了する前にバッファをフラッシュしません。

+0

いいえ、シャットダウン前にすべてのアイテムが処理されるように、代わりにakka.actor.Status.Succesのインスタンスを渡す必要がありますか? 説明をありがとう! – Calardan

関連する問題