2017-10-27 7 views
0

mapAsyncのようにストリームの一部を並列化したいが、未来はない。Akkaストリームで平行なflatMapConcatを実行する方法

現在のところ、以下の解決策がありますが、これにはプレーンなflatMapConcatがないマテリアライザーが必要です。

def flatMapConcatParallel[In, Out](parallelism: Int)(f: In => Source[Out, _])(implicit mat: Materializer): Flow[In, Out, NotUsed] = { 
    // TODO there should be a better way to add parallelism that avoids a run (and the need for a materializer) 
    Flow[In].mapAsync(parallelism){i => 
     f(i).runWith(Sink.head) 
    } 
    } 

if (parallel){ 
    val parallelism = 4 
    Flow[Batch].via(flatMapConcatParallel(parallelism)(singleRun)) 
} else{ 
    Flow[Batch].flatMapConcat(singleRun) 
} 

これは低レベル(GraphStageLogic)を移動することなく、既存の構造で実現することができる方法上の任意のヒント?

何について
      ---> f() ---> 
--d-c-b-a--> OrderedBalance ---> f() ---> OrderedMerge --d'-c'-b'-a'---> 
          ---> f() ---> 

答えて

2

:なぜ私はこれを自分自身を思い付くしていない

Flow[In].mapAsync(parallelism)(i => Future.successful(f(i))).flatMapConcat(identity) 
+0

?ありがとう。 – Somatik

+0

ようこそ。もちろん、Future.applyメソッドを使用して独自のExecutionContextを提供することもできます。 –

+0

私は少し速かったです。このソリューションは、ソースグラフの作成を並列化しますが、ストリームの実行は並列化しません。 – Somatik

関連する問題