2017-11-30 8 views

答えて

0

まずスニペットここ

各着信バルク、Futureに変換されるで

Flow[RandomCdr] 
     .grouped(bulkSize) 
     .flatMapConcat{ (bulk : Seq[RandomCdr]) => 
     Source.fromFuture(collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)) 
     } 
     .toMat(Sink.ignore)(Keep.right) 


Flow[RandomCdr] 
    .grouped(bulkSize) 
    .map((bulk : Seq[RandomCdr]) => collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec)) 
    .toMat(Sink.ignore)(Keep.right) 

を[完了] dはFutureが提供する実行コンテキスト内で実行されると述べています。この時点でのみ、別のFutureなどを生成して次のバルクを処理します。

基本的に先物は順番に実行されます。これは、あなたが提供する実行コンテキスト内で実行されます

Flow[RandomCdr] 
     .grouped(bulkSize) 
     .mapAsync(parallelism = 1){ (bulk : Seq[RandomCdr]) => 
     collection.flatMap(_.insert[RandomCdr](false)(randomCdrWriter,ec).many(bulk)(ec))(ec) 
     } 
     .toMat(Sink.ignore)(Keep.right) 

セカンドスニペットここ

各着信バルクがFutureに変換される、と行動が似ています。 FutureはすぐにSink.ignoreに渡され、その参照は破棄されます。

Futureが同時に実行される回数を制御することはできません。このため、この方法はお勧めしません。

改善された並列性を探している場合は、上記のようにmapAsyncを使用して、並列性パラメータを微調整してください。

+0

これは、2番目のスニペットが背圧を提供しないことを意味しますか? – vgkowski

+0

正しいですが、それはバックプレッシャーなしで可能な限り多くの未来を吹き飛ばします –

関連する問題