2017-12-01 8 views
0

rxjava2を使用したいと思うのは、ダウンストリームリクエスト1、rxjava2プロデューサ - コンシューマ、 'downstream'リクエスト1、 'upstream'で1を送信

flatMapまたはobserveOnにはデフォルトのバッファサイズ128があるので、バッファサイズは1に設定されていますが、それも機能しません。私はs.request(1)一度だけ呼び出すため

flatMap:1 
flatMap:2 
onNext:1 
flatMap:3 

予想される出力、::

Flowable.defer((Callable<Publisher<Integer>>)() -> Flowable.range(1, 5)) 
      .flatMap((Function<Integer, Publisher<Integer>>) integer -> { 
       //do something with long time. 
       System.out.println("flatMap:" + integer); 
       return Flowable.just(integer); 
      }, false, 1) //=====> 1 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.computation(), false, 1) //=====> 2 
      .subscribe(new Subscriber<Integer>() { 
       @Override 
       public void onSubscribe(Subscription s) { 
        //request one 
        s.request(1); 
       } 

       @Override 
       public void onNext(Integer integer) { 
        System.out.println("onNext:" + integer); 
       } 

       @Override 
       public void onError(Throwable t) { 

       } 

       @Override 
       public void onComplete() { 

       } 
      }); 

実際の出力は

flatMap:1 
onNext:1 

答えて

0

をあなたの観察者は一つだけのアイテムを要求しますが、observeOn()は同様に一つの項目をバッファリングします。 flatMap()演算子自体は、連続する入力にサブスクライブします。

  1. オブザーバーはオブザーバーチェーンに加入し、1つのアイテムを要求します。
  2. observeOn()はそのバッファに1アイテムを要求します。
  3. range()オペレータが出す1.
  4. flatMap()は1を受信し、内部的にはフローティングに加入して最初のログ行を生成します。
  5. observeOn()は、そのバッファ用に1つのアイテムを取得し、次に別のアイテムを要求します。
  6. flatMap()は2、次の項目を取得し、これが放出され、観察者がonNext()が呼び出され
  7. observeOn()バッファに渡されます。フロー制御は、それを行うための方法ではありませんそして、>「プロセス1」 - あなたは、ロックステップ、「要求1」を完璧必要がある場合は
  8. flatMap()は3

、次の項目を取得します。代わりに、オブザーバが次のオブザーバブルを処理するようにオブザーバが指示するようなフィードバックループを提供するオブザーバブルを導入することができます。

+0

詳細な手順をお寄せいただきありがとうございます。私はまた、「流動性がある」ことはできないと思うので、私はあなたの「フィードバックループ」方法を試みます。 – xymelon

関連する問題