2017-08-29 16 views

答えて

2

間接回答

私の意見では、あなたの質問には、アッカのストリームを扱う際に共通である「新人のミス」のいずれかを強調しています。 akkaストリーム構造内にビジネスロジックを置くのは、通常、良い組織ではありません。あなたが持っていた場合

val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B => 
    //business logic 
} 

は、より理想的なシナリオは次のようになります:

//normal function, no akka involved 
val bToCFunc : B => C = { b : B => 
    //business logic 
} 

val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc 

上記の「理想」の例ではFlowは上だけ薄いベニヤであるあなたの質問は、フォームの何かを持っていることを示していますノンアカ、ビジネスロジックのトップ。

別々のロジックは、単純にあなたの元の質問を解決することができます:

val aToSeqOfC : Flow[A, Seq[C], NotUsed] = 
    aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc)) 

直接回答

あなたのコードを再編成することができない場合は、唯一の選択肢は、先物に対処することです。あなたは別のサブストリーム内bToCを使用する必要があります:

val mat : akka.stream.Materializer = ??? 

val seqBToSeqC : Seq[B] => Future[Seq[C]] = 
    (seqB) => 
    Source 
     .apply(seqB.toIterable) 
     .via(bToC) 
     .to(Sink.seq[C]) 
     .run() 

あなたは、あなたが探しているフローを構築するためにmapAsync以内にこの機能を使用することができます。

val parallelism = 10 

val aToSeqOfC: Flow[A, Seq[C], NotUsed] = 
    aToSeqB.mapAsync(parallelism)(seqBtoSeqC) 
+0

私のコード 'seqBToSeqCで'はコンパイルされません。 'Source.apply(_)'の署名が無効であることを示しています。 –

+0

それは私のためにコンパイルされますが、答えは歓迎されます。 –

+0

それを試してみてください。署名なしと同じです。タイプミスマッチが予想される 'Iterable [NotInferedT]'実際の 'Seq [B]' –

関連する問題