1
に配列要素を操作します次のようになりますが、私はそれが要素で終わっており、Seq[C]
ではないことを知っています。は、私は次のような2つの流れを持っているアッカフロー
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
このシナリオではSeq
をどのように保持できますか?
に配列要素を操作します次のようになりますが、私はそれが要素で終わっており、Seq[C]
ではないことを知っています。は、私は次のような2つの流れを持っているアッカフロー
Flow[A].via(aToSeqOfB).mapConcat(_.toList).via(bToC)
このシナリオではSeq
をどのように保持できますか?
間接回答
私の意見では、あなたの質問には、アッカのストリームを扱う際に共通である「新人のミス」のいずれかを強調しています。 akkaストリーム構造内にビジネスロジックを置くのは、通常、良い組織ではありません。あなたが持っていた場合
val bToC : Flow[B, C, NotUsed] = Flow[B] map { b : B =>
//business logic
}
は、より理想的なシナリオは次のようになります:
//normal function, no akka involved
val bToCFunc : B => C = { b : B =>
//business logic
}
val bToCFlow : Flow[B,C,NotUsed] = Flow[B] map bToCFunc
上記の「理想」の例ではFlow
は上だけ薄いベニヤであるあなたの質問は、フォームの何かを持っていることを示していますノンアカ、ビジネスロジックのトップ。
別々のロジックは、単純にあなたの元の質問を解決することができます:
val aToSeqOfC : Flow[A, Seq[C], NotUsed] =
aToSeqOfB via (Flow[Seq[B]] map (_ map bToCFunc))
直接回答
あなたのコードを再編成することができない場合は、唯一の選択肢は、先物に対処することです。あなたは別のサブストリーム内bToC
を使用する必要があります:
val mat : akka.stream.Materializer = ???
val seqBToSeqC : Seq[B] => Future[Seq[C]] =
(seqB) =>
Source
.apply(seqB.toIterable)
.via(bToC)
.to(Sink.seq[C])
.run()
あなたは、あなたが探しているフローを構築するためにmapAsync
以内にこの機能を使用することができます。
val parallelism = 10
val aToSeqOfC: Flow[A, Seq[C], NotUsed] =
aToSeqB.mapAsync(parallelism)(seqBtoSeqC)
私のコード 'seqBToSeqCで'はコンパイルされません。 'Source.apply(_)'の署名が無効であることを示しています。 –
それは私のためにコンパイルされますが、答えは歓迎されます。 –
それを試してみてください。署名なしと同じです。タイプミスマッチが予想される 'Iterable [NotInferedT]'実際の 'Seq [B]' –