Akkaストリームは、私の定型コードを大幅に削減し、多くの便利な機能を含んでいます。しかし、アイテムが処理される速度を制限できる必要があります。問題は、(単一のオンラインサイトから)時間の経過とともにダウンロードするリソースのソースリンクに接続されたHazelcastキューを供給しているが、キューに入るリンクの数がかなり大きくなる可能性があるということです。理想的には、一度に50〜60件を超えるリクエストは実行されません。 Akka Streamsには、一度に処理されるアイテムの数を制限する機能がありますか?Akka Streams。 Akkaストリームで一度に処理されるアイテムの数を制御する
さらに制限は、複雑な状態管理、コード処理、および特定のWebサイトとのやりとりにおける他の機能の必要性です。 Akka Httpはここで助けができません。私のネットワークコードはJsoupとApache Http Componentsで書かれており、スクリプトをレンダリングするJavaFXベースのサーバーを呼び出すことがあります。
ドキュメントで説明したように、バッファを使用して入力の速度を制御するために私の現在の試みは、以下:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
ありがとうございます。私はこの機能について完全に光っています:) –
興味深いメモ。並行性レベルが低すぎると、パイプが詰まっているように見えます。これは、並行処理レベルより最終的な処理で処理する要素が少ない場合にも、処理の最後にも発生します。これに気付いたことがありますか?これらの記録を強制する方法はありますか? –