2016-03-21 10 views
3

this thread準拠conCatMapとフラットマップは、アイテムが発行される順序によって異なります。だから私はテストを行い、整数の単純なストリームを作成し、どのような順序でそれらが放出されるのか見たいと思った。私は、1から5までの範囲の数字をとり、2つずつ複数の数値を取る小さな観測値を作った。簡単です。ここでRxJava - flatmapとconcatMap - なぜサブスクリプションで同じものを注文していますか?

はflatmapとコードです:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call(Integer integer) { 
      return Observable.just(integer * 2); 
     } 
    }).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(Integer integer) { 
     Log.v("myapp","from flatMap:"+integer); 
     } 
    }); 

とconcatMapを使用して、まったく同じコード:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call(Integer integer) { 
      return Observable.just(integer * 2); 
     } 
    }).subscribe(new Observer<Integer>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(Integer integer) { 
     Log.v("myapp","from concatmap:"+integer); 
     } 
    }); 

私は順序が両方とも同じであったログにプリントアウトを見たとき、なぜ?私はconcatMapだけが順序を保持すると思った?

答えて

9

あなたが見ているのは偶然です。 flatMapが値を返すたびに、前のスレッドと同じスレッドで値を返します。私は、マルチスレッドを利用するためにあなたの例を変更した

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 
     .flatMap(integer -> Observable.just(integer) 
       .observeOn(Schedulers.computation()) 
       .flatMap(i -> { 
        try { 
         Thread.sleep(new Random().nextInt(1000)); 
         return Observable.just(2 * i); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
         return Observable.error(e); 
        } 
       })) 
     .subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("onCompleted")); 

私は別の順序を強制的にランダムな遅延によって各2 * i値を遅らせています。また、observeOn(Schedulers.computation())を追加したので、次の演算子(flatMap)が計算スレッドプール上で実行されます。これはマルチスレッドマジックを行います。

これは私が(Androidの上で)私の例のために得る出力されます:私はconcatMapjustflatMapを交換する場合

I/System.out: 6 
I/System.out: 4 
I/System.out: 12 
I/System.out: 14 
I/System.out: 8 
I/System.out: 2 
I/System.out: 16 
I/System.out: 20 
I/System.out: 10 
I/System.out: 18 
I/System.out: onCompleted 

、その後、私は適切に注文した出力が得られます。

適切な説明があるgreat post by Thomas Nieldがあります。

+0

リンクを修正してください。 https://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.htmlにする必要がありますか? –

+0

@XiaoPeng完了、ありがとう –

関連する問題