私はAkkaストリームのエラー処理。どの行が失敗したかを知る方法?
http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html
を扱うアッカストリームエラーでこの記事を読んで、このコードを書きました。
val decider: Supervision.Decider = {
case _: Exception => Supervision.Restart
case _ => Supervision.Stop
}
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val source = Source(1 to 10)
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
これは私が処理されませんでした行を参照するために出力をスキャンする必要があることを除いて....非常によく動作します。失敗した行を印刷/ログする方法はありますか? [私が書いた各フローに明示的なtry/catchブロックを置かずに]
たとえば、私は俳優(ストリームとは対照的に)を使用していた場合、俳優のライフサイクルイベントを書くことができました。再起動時に処理中のメッセージとともにアクターが再起動したときにログに記録されます。
ここでは、(私は内部的に使用されていますが)明示的にアクターを使用していません。フロー/ソース/シンクのライフサイクルイベントはありますか?
として いくつかのロギングライブラリを使用することをお勧めしますログアウト(またはあなたがそれらで何をしたいか)? – Tyler
だからストリームをフォークするためには、あなたはまだすべての例外を捕まえなければならず、キャッチブロックのルートで別のフローに...非常に面倒ではないでしょうか?失敗したメッセージでストリームをフォークするのは簡単ですか? –
ああ、私はあなたの例を理解していませんでした。私はあなたの条件文があなたが事前に見ている事件を知っていることを示したと思った。 – Tyler