2017-09-21 10 views
0

私はAkkaストリームのエラー処理。どの行が失敗したかを知る方法?

http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html

を扱うアッカストリームエラーでこの記事を読んで、このコードを書きました。

val decider: Supervision.Decider = { 
    case _: Exception => Supervision.Restart 
    case _ => Supervision.Stop 
} 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

val source = Source(1 to 10) 
val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")} 
val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x)) 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s => 
    import GraphDSL.Implicits._ 
    source ~> flow ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
future.onComplete{ _ => 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 

これは私が処理されませんでした行を参照するために出力をスキャンする必要があることを除いて....非常によく動作します。失敗した行を印刷/ログする方法はありますか? [私が書いた各フローに明示的なtry/catchブロックを置かずに]

たとえば、私は俳優(ストリームとは対照的に)を使用していた場合、俳優のライフサイクルイベントを書くことができました。再起動時に処理中のメッセージとともにアクターが再起動したときにログに記録されます。

ここでは、(私は内部的に使用されていますが)明示的にアクターを使用していません。フロー/ソース/シンクのライフサイクルイベントはありますか?

+0

として いくつかのロギングライブラリを使用することをお勧めしますログアウト(またはあなたがそれらで何をしたいか)? – Tyler

+0

だからストリームをフォークするためには、あなたはまだすべての例外を捕まえなければならず、キャッチブロックのルートで別のフローに...非常に面倒ではないでしょうか?失敗したメッセージでストリームをフォークするのは簡単ですか? –

+0

ああ、私はあなたの例を理解していませんでした。私はあなたの条件文があなたが事前に見ている事件を知っていることを示したと思った。 – Tyler

答えて

1

あなたのコードにほんの少しの修正:

val decider: Supervision.Decider = { 
    case e: Exception => 
    println("Exception handled, recovering stream:" + e.getMessage) 
    Supervision.Restart 
    case _ => Supervision.Stop 
} 

あなたは、ストリームであなたの例外、例えばラインに意味のあるメッセージを渡す場合は、監督の決定者でそれらを印刷することができます。

私は迅速かつ短い答えを与えるためにprintlnを使用しますが、強くあなたがストリームをフォークし、彼らは異なる「悪い」キューに悪いものをプッシュするだろうなscala-logging

関連する問題