2016-08-12 6 views
2

RxJavaでバックプレッシャーのためにいくつかのドキュメントを読んでいますが、ライブラリ内でどのように内部的に起こるかなど詳細な説明は見つかりませんでした。 「消費者」が遅すぎる。以下のコードのような例えば BackpressureはRxJavaの内部でどのように起こるのですか

Observable.interval(1, TimeUnit.MILLISECONDS) 
    .observeOn(Schedulers.newThread()) 
    .subscribe(
     i -> { 
      System.out.println(i); 
      try { 
       Thread.sleep(100); 
      } catch (Exception e) { } 
     }, 
     System.out::println); 
アイブ氏は、私の理解は、メインスレッドで、我々はつもりであるということですので、RxJavaのソースコードを経由して

があらゆるミリ秒単位でイベントを発行し、我々はそれを放出した後、我々は合格値をSystem.out.println(i)メソッドに渡して、newTheadスケジューラのスレッドプールにスローし、runnable内でメソッドを実行します。

私の質問は、例外はどのように内部的に発生するのですか? Thread.sleep()を呼び出すと、スレッドプール内の他のスレッドに影響を与えずに、メソッド呼び出し - > System.out.println()を処理するスレッドがスリープしているだけです。なぜなら、スレッドプールは十分なスレッドをもう使えないからですか?

Thanks

答えて

3

Youが1 operatorのhands its upstream sourceへout permitsのsystemとしてbackpressureのthinkでき:youはme 128 elementsをgiveでき。ちょっと後で、このオペレータは「大丈夫、もう一度96をくれ」と言うかもしれません。 intervalのような一部の情報源は、許可を気にせず、定期的に値を渡すだけです。許可の数は、通常、キューまたはバッファ内の使用可能な容量に強く結びついているため、これらの記憶域を超えるものを渡すと、収量はMissingBackpressureExceptionになります。

バックプレッシャ違反の検出は、主に、キューがいっぱいであることを示すobserveOnのような、境界キューに対するofferがfalseを返す場合に発生します。

違反を検出する第2の方法は、onBackpressureDropように、オペレータに未処理の許可回数を追跡することで、上流側がこれより多くを送信するたびに、オペレータは、単にそれを転送しないであろう。

// in onBackpressureDrop 
public void onNext(T value) { 
    if (emitted != availablePermits) { 
     emitted++; 
     child.onNext(value); 
    } else { 
     // ignoring this value 
    } 
} 

availablePermitsがあり、原因の可能な非同期実行に、実際には

public void childRequested(long n) { 
    availablePermits += n; 
} 

:子加入者は、通常onBackpressureDropにこのような何かにつながる要求(経由でその許可を)知らせますn AtomicLong(これはrequestedと呼ばれます)。

+0

RxJavaの一部の演算子は、キューまたはバッファのデータ構造体にonNext()イベントを配置すると言っていますか?例外はスレッドプールによるものではありません;) – Qing

関連する問題