2016-08-25 4 views
0

乱数のストリームがあります。同時に放出された2つのイベントの流れをどのように分割して処理するのですか?

rx.Observable 
.range (0, 1000) 
.map (() -> 200d * Math.random()) 

フローが2つに分割されている必要があります。 100未満の数字と100を超える数字。

その後、100未満の数字(チェーン1)の場合: ネットワークにrequest1を実行し、回答を待ち、他の演算子の処理チェーン1を続行する必要があります。

100(chain2)以上の数字の場合: 私は別のrequest2をお送りし、回答を待ってプロセスチェーンオペレータを続けてください。

request1request2はお互いを待たずにチェーンが並行して実行されます。しかし、連鎖内では、処理要求を待つ必要があります。

どうすればよいですか?

答えて

0
rx.Observable 
         .create(subscriber -> { 
          for (int i = 0; i < 100; i++) { 
           subscriber.onNext(i); 
           Log.i("Iniop", "Create thread name: " + Thread.currentThread().getName()); 
          } 
          subscriber.onCompleted(); 
         }) 
         .onBackpressureBuffer() 
         .observeOn(Schedulers.computation()) 
         .subscribeOn(Schedulers.computation()) 
         .map(v -> { 
          Log.i("Iniop", "Map thread name: " + Thread.currentThread().getName()); 
          return 200d * Math.random(); 
         }) 
         .groupBy(k -> { 
            Log.i("Iniop", "Group thread name: " + Thread.currentThread().getName()); 
            return k > 100 ? "yes" : "no"; 
           } 
           , v -> v) 
         .forEach(gO -> gO.observeOn(Schedulers.newThread()) 
             .map(v -> new Pair<String, Double>(gO.getKey(), v)) 
             .subscribe(v -> { 
                Log.i("Iniop", "Key: " + v.first); 
                Log.i("Iniop", "Value: " + v.second); 
                Log.i("Iniop", "Thread name: " + Thread.currentThread().getName()); 
               } 
               , e -> Log.e("Iniop", "Err", e)) 
           , e -> Log.e("Iniop", "Err", e)); 
+0

演算子スケジューラで実行されるように 'map'および' groupBy'演算子の前に 'subscribeOn'を移動したいかもしれません。 – JohnWowUs

関連する問題