2017-06-09 3 views
2

Akkaストリームは、私の定型コードを大幅に削減し、多くの便利な機能を含んでいます。しかし、アイテムが処理される速度を制限できる必要があります。問題は、(単一のオンラインサイトから)時間の経過とともにダウンロードするリソースのソースリンクに接続されたHazelcastキューを供給しているが、キューに入るリンクの数がかなり大きくなる可能性があるということです。理想的には、一度に50〜60件を超えるリクエストは実行されません。 Akka Streamsには、一度に処理されるアイテムの数を制限する機能がありますか?Akka Streams。 Akkaストリームで一度に処理されるアイテムの数を制御する

さらに制限は、複雑な状態管理、コード処理、および特定のWebサイトとのやりとりにおける他の機能の必要性です。 Akka Httpはここで助けができません。私のネットワークコードはJsoupとApache Http Componentsで書かれており、スクリプトをレンダリングするJavaFXベースのサーバーを呼び出すことがあります。

ドキュメントで説明したように、バッファを使用して入力の速度を制御するために私の現在の試みは、以下:

val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf) 
val source = Source.fromGraph(sourceGraph)  
val (killSwitch, last) = source 
       .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure) 
       .viaMat(new DownloadFlow())(Keep.both) 
       .map(x => println(x)) 
       .to(Sink.ignore).run() 

答えて

2

順序があることを必要としない場合は、あなたが探しているメカニズムは、mapAsync(またはmapAsyncUnorderedですあなたの例のように保存されます)。 これらのコンビネータは、parallelismパラメータをとります。このパラメータは、ステージが実行できる並列タスクの数を制限する目的を持っています。

DownloadFlowの一部にする必要があります。 あなたDownloadFlow実行非同期コードを仮定すると、あなたはこのようにそれを構造化することができます:あなたのダウンロードの流れが意味のある値をマテリアライズしていたよう

def download(input: Input): Future[Output] = ??? 

val downloadFlow: Flow[Input, Output, NotUsed] = Flow[Input].mapAsyncUnordered(50)(download) 

val (killSwitch, last) = source 
       .buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure) 
       .viaMat(downloadFlow)(Keep.both) 
       .map(x => println(x)) 
       .to(Sink.ignore).run() 

を、それはおそらく、少し複雑になりますが、うまくいけばあなたのアイデアを得ます。

詳細はdocsを参照してください。

+0

ありがとうございます。私はこの機能について完全に光っています:) –

+0

興味深いメモ。並行性レベルが低すぎると、パイプが詰まっているように見えます。これは、並行処理レベルより最終的な処理で処理する要素が少ない場合にも、処理の最後にも発生します。これに気付いたことがありますか?これらの記録を強制する方法はありますか? –

関連する問題