2017-12-13 5 views
0

私はいくつかのデータを送信するサブジェクトがあります。次に、キュー内のこのデータを変更したい(長い操作)。しかし、別の主題は、今、私はそれらの要素がすでにconcatMap(によって消費されていない場合otherSubject(3と4)でemitedの要素をスキップするために私の使い捨てを変更したい、私はRxJava 2 - concatMapのキューを減らす

PublishSubject<Integer> myBaseSubject = PublishSubject.create(); 
    PublishSubject<Integer> otherSubject = PublishSubject.create(); 

    myBaseSubject 
      .concatMap(integer -> Observable.timer(5, TimeUnit.SECONDS) // long operation 
        .map(aLong -> integer) 
      ) 
      .subscribe(
        integer -> Log.i(TAG, "result: " + integer), 
        Functions.emptyConsumer() 
      ); 

    myBaseSubject.onNext(1); 
    myBaseSubject.onNext(2); 
    myBaseSubject.onNext(3); 
    myBaseSubject.onNext(4); 
    myBaseSubject.onNext(5); 

    otherSubject.onNext(3); 
    otherSubject.onNext(4); 

    myBaseSubject.onNext(6); 
    myBaseSubject.onNext(7); 

私のキューを減らしたい他のものを放出するとき、彼らはまだ待ち行列に入っている)。

これは可能ですか?

PS:PublishSubjectが必要とされていない - それはちょうど簡潔

答えて

0

のために使われているあなたはconcatMapによって処理されていないすべき要素を追跡するConcurrentHashMapを持つことができます。私は、アイテムを処理しないという決定は、そのアイテムのインデックスが作成された後に起こると仮定しています。

ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>(); 

myBaseSubject 
    .doOnNext(v -> map.put(v, v)) 
    .concatMap(integer -> { 
     if (map.remove(integer) != null) { 
      return Observable.timer(5, TimeUnit.SECONDS) // long operation 
       .map(aLong -> integer) 
     } 
     return Observable.empty(); 
    }) 
    .subscribe(
     integer -> Log.i(TAG, "result: " + integer), 
     Functions.emptyConsumer() 
    ); 

myBaseSubject.onNext(1); 
myBaseSubject.onNext(2); 
myBaseSubject.onNext(3); 
myBaseSubject.onNext(4); 
myBaseSubject.onNext(5); 

map.remove(3); 
map.remove(4); 

myBaseSubject.onNext(6); 
myBaseSubject.onNext(7); 
関連する問題