0
私はいくつかのデータを送信するサブジェクトがあります。次に、キュー内のこのデータを変更したい(長い操作)。しかし、別の主題は、今、私はそれらの要素がすでにconcatMap
(によって消費されていない場合otherSubject
(3と4)でemitedの要素をスキップするために私の使い捨てを変更したい、私はRxJava 2 - concatMapのキューを減らす
PublishSubject<Integer> myBaseSubject = PublishSubject.create();
PublishSubject<Integer> otherSubject = PublishSubject.create();
myBaseSubject
.concatMap(integer -> Observable.timer(5, TimeUnit.SECONDS) // long operation
.map(aLong -> integer)
)
.subscribe(
integer -> Log.i(TAG, "result: " + integer),
Functions.emptyConsumer()
);
myBaseSubject.onNext(1);
myBaseSubject.onNext(2);
myBaseSubject.onNext(3);
myBaseSubject.onNext(4);
myBaseSubject.onNext(5);
otherSubject.onNext(3);
otherSubject.onNext(4);
myBaseSubject.onNext(6);
myBaseSubject.onNext(7);
私のキューを減らしたい他のものを放出するとき、彼らはまだ待ち行列に入っている)。
これは可能ですか?
PS:PublishSubjectが必要とされていない - それはちょうど簡潔