2017-08-17 3 views
1

私は、複数のイベントストリームをmerge()演算子と組み合わせ、限られた量の 'アクティブストリーム'を探しています。たとえば、6つのストリームをマージすると(すべてが別のスレッドで動作しています)、merge()はそれらのすべてにサブスクライブしますが、最初は3に、サブスクリプションを完了する6つのストリームのすべてが完了するまで繰り返す。マージや他のRxJava演算子でこれを達成することは可能ですか、それとも私自身で書くべきですか?RxJavaのマージ()でアクティブなストリームの数を制限する方法は?

+1

ちょうどmaxConcurrency' 'という名前のパラメータを持っているものを選ん:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#merge(java.lang.Iterable、 %20int) – akarnokd

答えて

1

マージ演算子でmaxConcurrency値を使用できますが、すべてのObservableを1つにまとめる必要があります。

@Test 
    public void testMergeMaxConcurrency() { 
     Observable.merge(Observable.just(
       Observable.just(3), 
       Observable.just(5), 
       Observable.just(1), 
       Observable.just(4), 
       Observable.just(2)), 2) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
       .doOnNext(Collections::sort) 
       .subscribe(System.out::println); 
    } 
関連する問題