0
mapAsyncのようにストリームの一部を並列化したいが、未来はない。Akkaストリームで平行なflatMapConcatを実行する方法
現在のところ、以下の解決策がありますが、これにはプレーンなflatMapConcatがないマテリアライザーが必要です。
def flatMapConcatParallel[In, Out](parallelism: Int)(f: In => Source[Out, _])(implicit mat: Materializer): Flow[In, Out, NotUsed] = {
// TODO there should be a better way to add parallelism that avoids a run (and the need for a materializer)
Flow[In].mapAsync(parallelism){i =>
f(i).runWith(Sink.head)
}
}
と
if (parallel){
val parallelism = 4
Flow[Batch].via(flatMapConcatParallel(parallelism)(singleRun))
} else{
Flow[Batch].flatMapConcat(singleRun)
}
これは低レベル(GraphStageLogic)を移動することなく、既存の構造で実現することができる方法上の任意のヒント?
何について ---> f() --->
--d-c-b-a--> OrderedBalance ---> f() ---> OrderedMerge --d'-c'-b'-a'--->
---> f() --->
?ありがとう。 – Somatik
ようこそ。もちろん、Future.applyメソッドを使用して独自のExecutionContextを提供することもできます。 –
私は少し速かったです。このソリューションは、ソースグラフの作成を並列化しますが、ストリームの実行は並列化しません。 – Somatik