2017-11-18 13 views
7

次のことを行うためのAkkaストリームコンビネータがありますか? (今のところandそれを呼びましょう。)2つのフローを並べるにはどうすればいいですか?

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

意味はどんなソースは、その要素が両方Flow秒に渡され、その出力はタプルとして新しいFlowに統合されることになります。 (圏論から矢印に精通した者は、関数型プログラミングを風味のために、私は&&&のようなものを探しています。)つまり、zipalsoToを関連見えたライブラリに2つのコンビネータがあり

。しかし、前者はSourceShapeを、後者はSinkShapeを受け入れます。どちらも、GraphShapeを認めません。これはなぜですか?

私のユースケースは、次のようなものです:

someSource 
    .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) 
    .runWith(someSink) 

これは動作しますが、私が探しています:

someSource 
    .via(someFlowThatReturnsUnit.and(Flow.apply)) 
    .runWith(someSink) 

.andのようなものを見つけるために失敗すると、私はこのような私の元Flowを修正しましたより洗練された、より組成的な溶液。

+1

Aフロー厳密に1ではありません。 (GraphDSLを使用し、ブロードキャスト+マージを使用することができます) –

答えて

6

注意

ヴィクトルクランは、コメントで述べたように:入力要素数と出力に対して1:両方の流れ、flow1 & flow2は、1であることが知られている場合Tuple2[O,O2]にジッピングにのみ実行可能です要素数。

グラフベースのソリューション

タプル構築物はGraphの内部に作成することができます。実際には、あなたの質問にはほぼ完全入門例と一致します。

enter image description here

リンクのサンプルコードを拡張し、あなたが使用することができますBroadcastZip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.ignore 

    val bcast = builder.add(Broadcast[Int](2)) 

    val merge = builder.add(Zip[Int, Int]()) //different than link 

    val f1, f2, f4 = Flow[Int].map(_ + 10) 

    val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link 

    in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
       bcast ~> f4 ~> merge 
    ClosedShape 
})//end RunnableGraph.fromGraph 

ややハックストリームソリューション

純粋なストリームソリューションをお探しの場合、中間ストリームを使用することはできますが、Matは維持できないだろうし、それは各入力要素のために2つのストリームの実体を伴う:

def andFlows[I, O, O2] (maxConcurrentSreams : Int) 
         (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
         (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[O] = Source 
          .single(i) 
          .via(flow1) 
          .to(Sink.head[O]) 
          .run() 

    val o2 : Future[O2] = Source 
          .single(i) 
          .via(flow2) 
          .to(Sink.head[O2]) 
          .run() 

    o zip o2 
    }//end Flow[I].mapAsync 

ジェネリックビュン

あなたは、ほとんどのフローのために、このビュンは一般的なようにしたい場合は、出力タイプの意志(Seq[O], Seq[O2])である必要があります。このタイプは、上記andFlows関数でSink.seq代わりのSink.headを用いて生成することができる:1(1つの出力1つの入力)がそのような一般的なコンビネータは難しいであろう。

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) 
           (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
           (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[Seq[O]] = Source 
           .single(i) 
           .via(flow1) 
           .to(Sink.seq[O]) 
           .run() 

    val o2 : Future[Seq[O2]] = Source 
           .single(i) 
           .via(flow2) 
           .to(Sink.seq[O2]) 
           .run() 

    o zip o2 
    }//end Flow[I].mapAsync 
関連する問題