5

私は今すぐに説明している場合が非常に難しいので、問題を説明するために単純なバージョンを作成します。RxJavaでスループットを制限

私は、ファイルのArrayListで定義されたファイルのシーケンスを放出するObservable.from()を持っています。これらのファイルはすべてサーバーにアップロードする必要があります。そのために私は仕事をしてObservableを返す関数を持っています。

Observable<Response> uploadFile(File file); 

私はそれが狂気を取得、このコードを実行すると、Observable.from()は、ファイルのすべてを放出し、それらはすべてのものに、あるいは少なくともそれが処理できるスレッドの最大のためにアップロードされています。

最大2つのファイルを同時にアップロードしたいと考えています。私のためにこれを処理できる演算子はありますか?

私はバッファウィンドウや他のいくつかを試してみましたが、彼らは唯一の代わりに、常に二つの平行なファイルのアップロードを持つの2つのアイテムを発するように思われます。アップロード部分に最大スレッドプールを設定しようとしましたが、私の場合は使用できません。

この権利のための簡単な演算子が必要ですか?何か不足していますか?

答えて

4

flatMap()を使用しているため、すべてのファイルが同時にアップロードされると思います。すべての変換を同時に実行します。代わりに、concatMap()を使用して、別の変換を実行してください。そして、2つの並行アップロードを実行するには、window(2)をあなたのファイルに観測可能にしてから、あなたのコードと同じようにflatMap()を呼び出す必要があります。

Observable<Response> responses = 
    files 
     .window(2) 
     .concatMap(windowFiles -> 
     windowFiles.flatMap(file -> uploadFile(file)); 
    ); 

UPDATE

私は正確に何をしたいんよりよい解決策を見つけました。同時スレッドの最大数を受け入れる、オーバーロードのflatMap()があります。

Observable<Response> responses = 
    files 
     .onBackpressureBuffer() 
     .flatMap(index -> { 
     return uploadFile(file).subscribeOn(Schedulers.io()); 
     }, 2); 
+0

これは完璧ですね!私はそれを試し、あなたに知らせるでしょう。 –

+0

窓のオペレーターがうまくいきました。どうすればそのウィンドウを動かすことができますか?今すぐウィンドウがアップロードのためにファイル1とファイル2を発行する場合、両方が終了するまで待ちます。ファイル2のアップロードが完了し、ファイル1がまだ進行中の場合、ファイル3のアップロードはすでに実行できますか? –

+0

デフォルトの演算子では可能かどうかわからないので、独自の演算子を書く必要があります。 – Michael