2017-01-24 8 views
1

同じ入力で2つのフローをパラレル化するグラフを作成しました。このフローによってFuture [Option [Entity]]が生成されます。 flowAが失敗した場合、私は未来の[なし]が、いないように回復 Akkaストリームグラフの回復問題

val graph: Flow[Input, (Future[Option[Entity]], Future[Option[Entity]]), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val broadcast = builder.add(Broadcast[Input](2)) 
    val zip = builder.add(Zip[Future[Option[Entity]], Future[Option[Entity]]]) 

    val flowAwithRecovery = flowA.recover{ case t: Throwable => 
     logger.error(t, "Error retrieving output from flowA. Resuming without them.") 
     Future.successful(None) 
    } 

    broadcast.out(0) ~> flowAwithRecovery ~> zip.in0 
    broadcast.out(1) ~> flowB ~> zip.in1 

    FlowShape(broadcast.in, zip.out) 
    }) 

私は、グラフを実行し、flowAがインクルードが回復に失敗しました今後は実行されません返す

と呼ばれるを返すようにしたいと思います。回避策として、処理の最後に未来を取り戻していますが、私はグラフをデザインするときにこの種のロジックを入れたいと思います。

答えて

1

recoverコンビネータは、例外がアップストリームから伝播されたときに機能します。 Future.failedは例外ではありませんが、有効な要素です。 あなたはあなたが本当にあなたの流れの周りFuture Sを渡す必要はない、別のノートに

flowA.map(_.recover{ case t: Throwable => 
     logger.error(t, "Error retrieving output from flowA. Resuming without them.") 
     None 
    }) 

ようなものが必要でしょうか? flowAflowBをビルドしてOption[Entity]を生成するときは、mapAsyncを使用する方が良いでしょう。