2017-06-21 6 views
4

最近、私は背圧がどのように働くのか分かりません。RxJava2観測可能な背圧

私は小さなテストをしたと私はそれがMissingBackpressureException例外で失敗する必要があることを期待:それはMissingBackpressureExceptionを生成しないのはなぜ

Emit: 0 
Emit: 1 
Emit: 2 
... 
Emit: 10000 

Processed:0 
Processed:1 
Processed:2 
... 
Processed:10000 

:横

@Test 
public void testBackpressureWillFail() { 
    Observable.<Integer>create(e -> { 
     for (int i = 0; i < 10000; i++) { 
      System.out.println("Emit: " + i); 
      e.onNext(i); 
     } 
     e.onComplete(); 
    }) 
    .subscribeOn(Schedulers.newThread()) 
    .observeOn(Schedulers.computation()) 
    .doOnNext(i -> { 
     Thread.sleep(100); 
     System.out.println("Processed:" + i); 
    }) 
    .blockingSubscribe(); 
} 

は、システムアウトショーを。

私はe.onNext(i);ObservableObserveOnのバッファにアイテムを置くと、それのサイズはstatic final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

より大きくなった後それが起こらないMissingBackpressureExceptionを投げる必要があることを期待しています。バッファーは自動的に拡大しますか?アイテムはどこに保管されていないのですか?

+1

'Observable'sは背圧をサポートしていない、唯一' Flowable'sが –

+1

を行う私は、彼らがbackpreassureをサポートしていない、ということを知っているが、私はそれはMissingBackpressureExceptionは次のようになり手段をサポートしていないと思いましたスローされ、オートではないバッファーの成長。 –

答えて

3

背圧がRxJava2の場合にのみFlowableに移動したためです(hereを参照)。
FlowableBackpressureStrategy.MISSINGを入力すると例外が発生します。また、あなたのケースでは、あなたが実際にobserverOnドキュメントから自動的に成長するバッファ、 を持っていることを意味し

は非同期無制限バッファで、指定されたスケジューラにその排出量との通知を行うためObservableSourceを変更します。 .. RxJava2で

+0

ありがとうございます、あなたが説明してください、何が公開されている最終的な理由Observable observeOn(Scheduler scheduler、boolean delayError、int bufferSize)Observableでバッファが無制限の場合?私はそれがバッファーが成長する大きさになると訂正していますか? –

+0

まだ、私はそれもドキュメントによると、これはバッファの増分ステップである "島"の設定可能なサイズだと思います。 – yosriz

関連する問題