2017-07-17 6 views
1

私はbackpressure少し良く理解しようとするために一緒にこのダミーの例を置く:理解流動性の背圧rxjava2

Flowable.range(1, 100).onBackpressureDrop() 
         .subscribeOn(Schedulers.io()) 
         .observeOn(AndroidSchedulers.mainThread()) 
         .subscribeWith(object : DisposableSubscriber<Int>() { 
         override fun onStart() { 
          request(1) 
         } 

         override fun onComplete() { 
          Log.d([email protected]::class.java.simpleName, "onComplete") 
         } 

         override fun onNext(t: Int?) { 
          Log.d([email protected]::class.java.simpleName, t.toString()) 
          Thread.sleep(1000) 
          request(1) 
         } 

         override fun onError(t: Throwable?) { //handle error} 
         }) 

私は非常に遅いSubscriberを持っている非常に高速Flowableからのデータを消費しています。そして私はFlowableにonBackPressureDrop()に指示しています。加入者が非常に遅いですが、それはそうではありません、1から100までのすべての番号が印刷されているので、これにもかかわらず、私の出力は(1から100まで)、このようになります

07-16 23:07:21.097 22389-22389 D: 1 
07-16 23:07:22.100 22389-22389 D: 2 
07-16 23:07:23.102 22389-22389 D: 3 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.104 22389-22389 D: ... 
07-16 23:07:24.105 22389-22389 D: 99 
07-16 23:07:25.105 22389-22389 D: 100 
07-16 23:07:25.107 22389-22389 D: onComplete 

私は不足している要素を期待していました毎秒1つのコンソールに接続します。

次に、すべての値を一度に要求しようとしました。そこでをonStartに置き換えてrequest(Long.MAX_VALUE)に置き換え、onNextコールからrequest(1)を削除しました。要素数がなくても1〜100の数字が印刷されます。

ゆえに、低速の加入者のイベントの欠落をシミュレートするにはどうすればいいですか? バックプレッシャー例外を発生させるにはどうすればよいですか?

おかげ

答えて

3

observeOnは、あなたが、それは単にあなたが生成されているすべての100個の要素をバッファすることができるよう要素がドロップされ表示されていない理由です、128のデフォルトの内部バッファのサイズを持っています。 observeOn(mainThread(), false, 1)を使用してバッファサイズを1に設定し、ドロップを経験できます。

+0

それでした!ありがとう。私はドロップを経験しますが、 '.onBackPressureDrop()'演算子が存在する場合のみです。私はそれを削除すると、なぜ私は例外が発生しないのですか?バッファがいっぱいであってもすべての値を正常に出力します。 – feresr

+1

'Flowable.range'はバックプレッシャをサポートしており、バッファの大きさにかかわらず' observeOn'と相互運用でき、決してMBEを通知しません。 'Flowable'の各演算子には、** Backpressure ** javadocのエントリがあり、その動作方法を説明しています。 – akarnokd

関連する問題