2
Iチェーン連続する各ステージは、前のSuccess
場合を処理し、終了時にSink
を総称全てFailure
を処理フォームa -> Try[b]
のFlow
年代の一連たいです。単項短絡
これなどは簡潔にエンコードできますか?それは実際には線形の流れですが、どのステージでも放送と合併がいかに短いかわかりません。
Iチェーン連続する各ステージは、前のSuccess
場合を処理し、終了時にSink
を総称全てFailure
を処理フォームa -> Try[b]
のFlow
年代の一連たいです。単項短絡
これなどは簡潔にエンコードできますか?それは実際には線形の流れですが、どのステージでも放送と合併がいかに短いかわかりません。
object PartitionTry {
def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
import GraphDSL.Implicits._
val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
partition ~> failure
partition ~> success
new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
}
}
そして、その結果に応じて、Try
2にストリームを分割するファンアウトの段階を定義することです
Try
Sを摂取してにFailure
Sを送信することができ、あなたの一般的な流れ
object ErrorHandlingFlow {
def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
import GraphDSL.Implicits._
val partition = builder.add(PartitionTry[T]())
partition.out0 ~> sink
new FlowShape[Try[T], T](partition.in, partition.out1)
}
)
}
使用例にSuccess
ESを渡しながら、好みのシンク、
上記で定義
Either
などありがとう! –
'recover'とflatMapの結果をマップできませんか?多分私は誤解しています。 :) – erip