2016-06-13 1 views
0

オブジェクトをバックグラウンドスレッドの別のオブジェクトにマップ/変換し、1回の会話が完了するとすぐにメインスレッドに配置します。1つのマップが完成したらすぐにRxJavaマップを作成して出力します

Observable.just(1,2,3,4,5) 
      .map(new Func1<Integer, String>() { 
       @Override 
       public String call(Integer integer) { 
        Log.d(TAG, "mapping number " + integer); 
        return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
       } 
      }) 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Observer<String>() { 
       @Override 
       public void onCompleted() { 
        Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onError(Throwable e) { 
       } 

       @Override 
       public void onNext(String integer) { 
        Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       } 
      }); 

結果は次のとおりです。変換はしばらく時間がかかったことがあり、私は、すぐに変換が行われるようにそれらを受け取ることを期待しかし

D: mapping number 1 
D: mapping number 2 
D: mapping number 3 
D: mapping number 4 
D: mapping number 5 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 

D: mapping number 1 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 2 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 3 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 4 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 5 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 
+3

あなたの質問がありますか? RxJavaはあなたの説明に似ています。あなたの質問のログは、変換が速すぎるためです。マップ関数に 'Thread.sleep(1000)'を追加すると、異なるログが表示されます。 – zsxwing

+0

@zsxwingうまくいきました。 – Pedram

答えて

3

グローバル・バッファ・サイズを設定する必要はありませんが、ちょうど使用observeOn(Scheduler, int)オーバーロードプリフェッチ値を1に指定できます。前の値が処理された場合にのみ次の値を要求します。

0

これは、上記で使用しているチェーンの演算子にバックプレッシャーを適用しているためです。 ObserveOnのような下流の演算子は、効率のために個々の項目ではなく、チャンクによって上流からのデータを要求します。あなたは1にバッファサイズを設定した場合、これは効果的に使用すると、効率のコストを期待するもので達成します:

-Drx.ring-buffer.size=1 

を具体的にそれは高価な往復電話を持っているアップストリームのためにかなりひどいだろう。

EDIT

あなたはソートのあなたのダウンストリームの排出量までシリアライズするBehaviorSubjectとジッパーを使用することができます。

BehaviorSubject<Void> signal = BehaviorSubject.create(); 
signal.onNext(null); // <- pair up the signal with the first item immediately 
Observable.just(1,2,3,4,5) 
     .zipWith(signal, (item,v)->item) //only emit a next item when there is a "receipt acknowledgement" from the down stream 
     .observeOn(Schedulers.newThread()) //<- needed to avoid fetching subsequent items in UI thread 
     .map(new Func1<Integer, String>() { 
      @Override 
      public String call(Integer integer) { 
       Log.d(TAG, "mapping number " + integer); 
       return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
      } 
     }) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Observer<String>() { 
      @Override 
      public void onCompleted() { 
       Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
      } 

      @Override 
      public void onError(Throwable e) { 
      } 

      @Override 
      public void onNext(String integer) { 
       Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       signal.onNext(null); //<- acknowledge receipt - allow emitting next item from upstream 
      } 
     }); 
関連する問題