2016-08-03 6 views
3

akkaストリームRunnableグラフを直ちに停止する方法を理解できませんか?これを達成するためにkillswitchを使用するには?私がアクアストリームを始めたのはほんの数日でした。私の場合、私はファイルから行を読み込み、流し込みやシンクへの書き込みでいくつかの操作をしています。私がしたいことは、いつでもファイルをすぐに読むことをやめて、実行中のグラフ全体を停止させる必要があると思います。これに関するアイデアは非常に高く評価されます。akkaストリームを突然停止する方法Runnable Graph?

ありがとうございます。一つの方法は、グラフ解約

val graph= 
    Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println)) 
    val cancellable=graph.run() 

    cancellable.cancel 

がcancellable.cancelがActorSystem.registerOnTerminationの一部とすることができる呼び出すことができるサービス又はshutdownhookupを有する

答えて

2

アッカストリーム2.4.3ので、エレガントありますKillSwitch経由で外部からのストリームを停止する方法。

次の例では、10秒後にストリームを停止します。

object ExampleStopStream extends App { 

    implicit val system = ActorSystem("streams") 
    implicit val materializer = ActorMaterializer() 

    import system.dispatcher 

    val source = Source. 
    fromIterator(() => Iterator.continually(Random.nextInt(100))). 
    delay(500.millis, DelayOverflowStrategy.dropHead) 
    val square = Flow[Int].map(x => x * x) 
    val sink = Sink.foreach(println) 

    val (killSwitch, done) = 
    source.via(square). 
    viaMat(KillSwitches.single)(Keep.right). 
    toMat(sink)(Keep.both).run() 

    system.scheduler.scheduleOnce(10.seconds) { 
    println("Shutting down...") 
    killSwitch.shutdown() 
    } 

    done.foreach { _ => 
    println("I'm done") 
    Await.result(system.terminate(), 1.seconds) 
    } 

} 
+0

以下のコード例を参照してください:以下のグラフを停止し、大容量のファイルを読み込み解除する方法 を? RunnableGraph.fromGraph(GraphDSL.create(){ 暗黙ビルダー=> インポートGraphDSL.Implicits._ //ソースファイル ヴァルAから: の.outアウトレット[文字列] = builder.add(readFileLineByLine) ()を実行 A〜> E ClosedShape } .INインレット[文字列] = builder.add(writeLinesToFile)) – PainPoints

+0

あなたが知っているような例を提供しています。// ヴァルEを提出し、各ラインを書きますあなたが起動する前にシステムを停止することができますが、私の場合はいつでも実行しているシステムを停止する必要があります。 – PainPoints

+0

私はtthisの作業に行くアッカ・ストリームテストから、以下を参照してください。 ヴァル・スイッチ1 = KillSwitches.shared( "スイッチ") ヴァル下流= RunnableGraph.fromGraph(GraphDSL.create(){ 暗黙のビルダー=> インポートGraphDSLを。 Implicits._ //ソースファイル ヴァルAから: ヴァルEファイルに各ラインの書き込み //の.outアウトレット[文字列] = builder.add(readFileLineByLine):入口[文字列] = builder.addを(writeLinesToFile ).in A.via(switch1.flow)〜> E ClosedShape })。run() switch1.s​​hutdown() – PainPoints

関連する問題