最近、私は背圧がどのように働くのか分かりません。RxJava2観測可能な背圧
私は小さなテストをしたと私はそれがMissingBackpressureException
例外で失敗する必要があることを期待:それはMissingBackpressureException
を生成しないのはなぜ
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
:横
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
は、システムアウトショーを。
私はe.onNext(i);
がObservableObserveOn
のバッファにアイテムを置くと、それのサイズはstatic final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
より大きくなった後それが起こらないMissingBackpressureException
を投げる必要があることを期待しています。バッファーは自動的に拡大しますか?アイテムはどこに保管されていないのですか?
'Observable'sは背圧をサポートしていない、唯一' Flowable'sが –
を行う私は、彼らがbackpreassureをサポートしていない、ということを知っているが、私はそれはMissingBackpressureExceptionは次のようになり手段をサポートしていないと思いましたスローされ、オートではないバッファーの成長。 –