2017-12-12 8 views
0

を放射されたX項目よりも少ない場合、私はOBS1、obs2、obs3、...、

それらのそれぞれ(のMongoDBデータベースからの)項目の数を発することができる観測のリストを持っているスイッチは、私が興味を持っています最初のN個のアイテムにのみ表示されます。私は、必要に応じてオブザーバブルのクエリが実行されるようにしたいと思います。つまり、たとえばobs1がNより大きい値を生成した場合、obs2の後のクエリは実行されません。

私がconcatを使用すると、Observable(obs1、obs2、obs3、...)。concat、all Nアイテム未満である:クエリはMongoDBの基本的

で並列に実行することができ、私はX

..... obs1.switchIfX(obs2).switchIfX(obs3)のような操作を探しています現在の観測可能なものから放出される。

どのように私はこの要件をrxscalaスタイルで実装できますか?

+0

Nが10で、 'obs1'が5を生成した場合、その5を下流で利用できるようにするか、無視して'obs2 'を購読するべきですか? – akarnokd

答えて

2

あなたが(未テスト)このような何かを試みることができる:

Observable.just(obs1, obs2, obs3).flatten(maxConcurrent=1).take(N) 

maxConcurrent引数はflattenは一度に観察できるものに加入し、そしてNアイテムが放出された後、takeは上流から退会することが保証さその時点でobs2またはobs3がまだ購読されていない場合、それらは決して実行されません。

0

あなたは、リストにソースからアイテムを収集flatMapオペレータにそのサイズをチェックして、長さが十分でない場合は、別のソースに切り替えることができます:

@Test 
public void test() { 
    Observable.range(1, 5) 
    .compose(switchIfFewer(Observable.range(1, 8), 10)) 
    .compose(switchIfFewer(Observable.range(1, 15), 10)) 
    .test() 
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15); 
} 

static <T> ObservableTransformer<T, T> switchIfFewer(Observable<T> other, int n) { 
    return o -> { 
     return o.toList() 
     .flatMapObservable(list -> { 
      if (list.size() < n) { 
       return other; 
      } 
      return Observable.fromIterable(list); 
     }); 
    }; 
} 

あなたが取得したくない場合N個以上のアイテムの場合は、代わりにo.take(n).toList()と指定できます。

+0

ありがとう、非常に役に立ちますが、私が望むのは、obs1、obs2、obs3の順番で最初のNです。これはあなたの例では、私は1,2,3,4,5,1,2,3,4,5を期待しています、基本的にobs1とobs2だけが放射し、obs3はありません。 –

+0

私は分かりません。各ソースから最初のN個の要素が必要ですか?これは 'just(obs1、obs2、obs3).concatMap(o-> o.take(N))'のようなものです。 – akarnokd

+0

合計でN個の要素はありません。 N = 5、obs1 emit 2、obs2 emit 4、obs1から2要素、obs2から3要素、obs3を無視します。すでに5を収集しています。 –