注意
ヴィクトルクランは、コメントで述べたように:入力要素数と出力に対して1:両方の流れ、flow1
& flow2
は、1であることが知られている場合Tuple2[O,O2]
にジッピングにのみ実行可能です要素数。
グラフベースのソリューション
タプル構築物はGraphの内部に作成することができます。実際には、あなたの質問にはほぼ完全入門例と一致します。
リンクのサンプルコードを拡張し、あなたが使用することができますBroadcastとZip
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int]()) //different than link
val f1, f2, f4 = Flow[Int].map(_ + 10)
val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})//end RunnableGraph.fromGraph
ややハックストリームソリューション
純粋なストリームソリューションをお探しの場合、中間ストリームを使用することはできますが、Mat
は維持できないだろうし、それは各入力要素のために2つのストリームの実体を伴う:
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[O] = Source
.single(i)
.via(flow1)
.to(Sink.head[O])
.run()
val o2 : Future[O2] = Source
.single(i)
.via(flow2)
.to(Sink.head[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
ジェネリックビュン
あなたは、ほとんどのフローのために、このビュンは一般的なようにしたい場合は、出力タイプの意志(Seq[O], Seq[O2])
である必要があります。このタイプは、上記andFlows
関数でSink.seq
代わりのSink.head
を用いて生成することができる:1(1つの出力1つの入力)がそのような一般的なコンビネータは難しいであろう。
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[Seq[O]] = Source
.single(i)
.via(flow1)
.to(Sink.seq[O])
.run()
val o2 : Future[Seq[O2]] = Source
.single(i)
.via(flow2)
.to(Sink.seq[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
Aフロー厳密に1ではありません。 (GraphDSLを使用し、ブロードキャスト+マージを使用することができます) –