私はbackpressure
少し良く理解しようとするために一緒にこのダミーの例を置く:理解流動性の背圧rxjava2
Flowable.range(1, 100).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableSubscriber<Int>() {
override fun onStart() {
request(1)
}
override fun onComplete() {
Log.d([email protected]::class.java.simpleName, "onComplete")
}
override fun onNext(t: Int?) {
Log.d([email protected]::class.java.simpleName, t.toString())
Thread.sleep(1000)
request(1)
}
override fun onError(t: Throwable?) { //handle error}
})
私は非常に遅いSubscriber
を持っている非常に高速Flowable
からのデータを消費しています。そして私はFlowableにonBackPressureDrop()
に指示しています。加入者が非常に遅いですが、それはそうではありません、1から100までのすべての番号が印刷されているので、これにもかかわらず、私の出力は(1から100まで)、このようになります
07-16 23:07:21.097 22389-22389 D: 1
07-16 23:07:22.100 22389-22389 D: 2
07-16 23:07:23.102 22389-22389 D: 3
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.104 22389-22389 D: ...
07-16 23:07:24.105 22389-22389 D: 99
07-16 23:07:25.105 22389-22389 D: 100
07-16 23:07:25.107 22389-22389 D: onComplete
私は不足している要素を期待していました毎秒1つのコンソールに接続します。
次に、すべての値を一度に要求しようとしました。そこでをonStart
に置き換えてrequest(Long.MAX_VALUE)
に置き換え、onNext
コールからrequest(1)
を削除しました。要素数がなくても1〜100の数字が印刷されます。
ゆえに、低速の加入者のイベントの欠落をシミュレートするにはどうすればいいですか? バックプレッシャー例外を発生させるにはどうすればよいですか?
おかげ
それでした!ありがとう。私はドロップを経験しますが、 '.onBackPressureDrop()'演算子が存在する場合のみです。私はそれを削除すると、なぜ私は例外が発生しないのですか?バッファがいっぱいであってもすべての値を正常に出力します。 – feresr
'Flowable.range'はバックプレッシャをサポートしており、バッファの大きさにかかわらず' observeOn'と相互運用でき、決してMBEを通知しません。 'Flowable'の各演算子には、** Backpressure ** javadocのエントリがあり、その動作方法を説明しています。 – akarnokd