2017-09-29 6 views
1

バッチリクエストを作成する要素とシンクを持つソースがあります。 私はKillSwitchを使用して、任意の時点でグラフをシャットダウンできます。​​が Akkaストリーム - データを失うことなくグループ化したシャットダウンストリーム

val source = Source.tick(10.millis, 10.millis, "tick").grouped(500) 

val (switch, _) = source.viaMat(KillSwitches.single)(Keep.right) 
.toMat(sink)(Keep.both).run() 

Thread.sleep(3000) // wait some arbitrary time 

switch.shutdown() 

シャットダウンが発生したときに、不完全なバッチを「洗い流す」する方法はあり

と呼ばれているとき、最新の不完全なバッチのレコードはソース出力が迷子にされているということを問題?

答えて

3

キルスイッチシャットダウンの動作は、呼び出した後、そのドキュメント

通り、位置である[(UniqueKillSwitch#シャットダウンを)]] [[グラフ]] [[FlowShapeの の実行インスタンス]]]は、そのダウンストリームを完了し、 上流を取り消します(完了したか失敗した場合は、 コマンドは無視されます)。

他のドキュメントも参照してください。here

ステージは、完了時にのみ部分的に塗りつぶされたグループを放出しますが、キャンセルされたステージは放出しません。あなたが完了した時点で、下流部分のグループを放出する( killswitch後をグループ化)以下のグラフながら

val switch = 
    Source.tick(10.millis, 175.millis, "tick") 
      .grouped(10) 
      .viaMat(KillSwitches.single)(Keep.right) 
      .toMat(Sink.foreach(println))(Keep.left) 
      .run() 

を観察したよう

これは、以下のグラフは動作します( killswitch前をグループ化された)ことを意味

val switch = 
    Source.tick(10.millis, 175.millis, "tick") 
      .viaMat(KillSwitches.single)(Keep.right) 
      .grouped(10) 
      .toMat(Sink.foreach(println))(Keep.left) 
      .run() 
+0

ありがとう、それは動作します! –

+0

@ stanislav.chetvertkovあなたは非常に歓迎です:)あなたは親切に答えを受け入れることができますか? –

+0

説明をありがとう。いい答えだ。 –

関連する問題