1
私はRunnableGraph
のように次のようになります。 とmerge
ステージの間に単純なmap
がある場合、すべてが問題ありません。ただし、mapConcat
になると、このコードは最初の要素を消費した後には機能しません。Akka-stream mapConcatが循環RunnableGraphで動作しない
なぜ動作しないのか知りたい。
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))
S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
ClosedShape
})
// run() output:
// 3
// List(2,2,2)
本当に有益です!あなたの説明の後、オーバーフロー時にメッセージを失いたくないので、私は 'conflateWithSeed'が自分の状況に適していることを知ります。 –