2017-06-26 13 views
2

Iチェーン連続する各ステージは、前のSuccess場合を処理し、終了時にSinkを総称全てFailureを処理フォームa -> Try[b]Flow年代の一連たいです。単項短絡

これなどは簡潔にエンコードできますか?それは実際には線形の流れですが、どのステージでも放送と合併がいかに短いかわかりません。

+0

'recover'とflatMapの結果をマップできませんか?多分私は誤解しています。 :) – erip

答えて

1
これを解決する1つの方法は、

object PartitionTry { 
    def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒ 
     import GraphDSL.Implicits._ 

     val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a }) 
     val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t }) 
     val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) 

     partition ~> failure 
     partition ~> success 

     new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out) 
    } 
    } 

そして、その結果に応じて、Try 2にストリームを分割するファンアウトの段階を定義することです

Try Sを摂取してにFailure Sを送信することができ、あなたの一般的な流れ

以下

object ErrorHandlingFlow { 
    def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
     GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒ 
     import GraphDSL.Implicits._ 

     val partition = builder.add(PartitionTry[T]()) 

     partition.out0 ~> sink 

     new FlowShape[Try[T], T](partition.in, partition.out1) 
     } 
    ) 
    } 

使用例にSuccess ESを渡しながら、好みのシンク、

上記で定義

  • 2つのステージは、任意のタイプのために再使用可能であることに注意してください(どこでも使用し、一回書き込み)
  • このアプローチは、他のタイプのクラスのために再利用することができる - Eitherなど
よう
+0

ありがとう! –