2017-07-04 7 views
2

私はここにブログの記事を読んでいる:http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.htmlRxJava - 「一つだけ発光を一度に観測可能なチェーンを走行させている...」

それはその

ありませんと言われて

どのスケジューラを購読しているのかによって、一度にObservableチェーンの上を移動できるのは1回だけです( )。 以下は、次の発光を開始する前に、放射源から加入者への放射をすべて まで押し込む必要があることがわかります。右その引用されたテキストの上

として書かれた例である:として出力して

public static void main(String[] args) { 

    Observable<Integer> source = Observable.range(1,10); 

    source.map(i -> i * 100) 
      .doOnNext(i -> System.out.println("Emitting " + i 
        + " on thread " + Thread.currentThread().getName())) 
      .observeOn(Schedulers.computation()) 
      .map(i -> i * 10) 
      .subscribe(i -> System.out.println("Received " + i + " on thread " 
        + Thread.currentThread().getName())); 

    sleep(3000); 
} 

(REF出力1)

Emitting 100 on thread main 
Emitting 200 on thread main 
Emitting 300 on thread main 
Emitting 400 on thread main 
Emitting 500 on thread main 
Emitting 600 on thread main 
Emitting 700 on thread main 
Emitting 800 on thread main 
Emitting 900 on thread main 
Emitting 1000 on thread main 
Received 1000 on thread RxComputationThreadPool-3 
Received 2000 on thread RxComputationThreadPool-3 
Received 3000 on thread RxComputationThreadPool-3 
Received 4000 on thread RxComputationThreadPool-3 
Received 5000 on thread RxComputationThreadPool-3 
Received 6000 on thread RxComputationThreadPool-3 
Received 7000 on thread RxComputationThreadPool-3 
Received 8000 on thread RxComputationThreadPool-3 
Received 9000 on thread RxComputationThreadPool-3 
Received 10000 on thread RxComputationThreadPool-3 

当初、私は最終的な結果は思いました次のようになります。(参考出力2)

Emitting 100 on thread main 
Received 1000 on thread RxComputationThreadPool-3 
Emitting 200 on thread main 
Received 2000 on thread RxComputationThreadPool-3 
Emitting 300 on thread main 
Received 3000 on thread RxComputationThreadPool-3 
Emitting 400 on thread main 
Received 4000 on thread RxComputationThreadPool-3 
Emitting 500 on thread main 
Received 5000 on thread RxComputationThreadPool-3 
Emitting 600 on thread main 
Received 6000 on thread RxComputationThreadPool-3 
Emitting 700 on thread main 
Received 7000 on thread RxComputationThreadPool-3 
Emitting 800 on thread main 
Received 8000 on thread RxComputationThreadPool-3 
Emitting 900 on thread main 
Received 9000 on thread RxComputationThreadPool-3 
Emitting 1000 on thread main 
Received 10000 on thread RxComputationThreadPool-3 

しかし、observeOnを呼び出すと、あるストリームが別のストリームに引き渡され、そのまま続けることができます。したがって、この例では、最初のマップとdoOnNextのすべてが2番目のマップより前に完了し、購読しているようです。

質問:


それは理論的にそれを言うことが正しいです、出力が「REF出力2」のようになりますか、それは常には「REF OUTPUT1」と同じに見えるでしょうか?

私が推論しているのは、上記の例で次の演算子に渡される前に、1つの演算子がすべてのオブザーバブルを完全に処理しているように見えるということです。理論的には

答えて

2

、両方REF OUTPUT1参照OUTPUT2は可能であるが、実際には、 OUTPUT1 refに近いものになります。 Schedulers.computation()がどのようにホットになるかに応じて、いくつかのインタリーブが表示されることがありますが、完璧なピンポンパターンを得ることはまずありません。

observeOnは128要素のプリフェッチを持ち、受信された最初の要素は内部キューの要素の非同期再発光をトリガーします。エグゼキュータのスレッドはしばしば十分に速く開始/再開しないので、再発光が開始される時間は、メインスレッドはすでにすべての値を放出しています。

observeOnをプリフェッチ値が1になるように設定すると、ピンポン効果を得ることができます。しかし、この場合、オリジナルのスレッドに残るのではなく、オリジナルのエミッションが再発光スレッドにドラッグされる可能性があります。rangeの後にsubscribeOnを導入して、最初のdoOnNextが確実にスレッドに残っていることを確認してください。 (また、標準のスケジューラで実行をメインスレッドにピン止めできないことに注意してください。そのためには、blocking schedulerが必要です)。

+0

パーフェクト。説明をありがとう。私はピンポン効果がないと考えていましたが、私は、RxJavaに** ref出力1 **のみが可能であると考えていないことを確認したかったのです。これはUIプラットフォーム(javafxやアンドロイドなど)で作業していた場合は私を痛感し、別のオペレータに移動する前にすべての作業を1つのオペレータで完了しなければならないと仮定しました。私は128要素のプリフェッチについて知らなかった。ありがとう。 – EGHDK

関連する問題