2017-08-15 3 views
0

私はRxJavaの複雑さとグリップを取得しようとしますが、初心者の問題をヒットしています:RxJava:ホット観測の消費者

私は寒い1から熱いに観察を作成しようと、これは2人の消費者をサブスクライブしていますプッシュされたイベントを異なる速度で処理します。ここでは、コードスニペットです:

ConnectableObservable<Long> ob = Observable.interval(200, TimeUnit.MILLISECONDS) 
    .publish(); 
ob.connect();  

Consumer<Long> withSleep = (Long t) -> { 
    System.out.println("Second : " + t); 
    sleep(1); 
}; 

Consumer<Long> noSleep = (Long t) -> { 
    System.out.println("First : " + t); 
}; 

sleep(2); 

ob.observeOn(Schedulers.newThread()).subscribe(noSleep); 

ob.observeOn(Schedulers.newThread()).subscribe(withSleep); 

sleep(5); 

睡眠(2)だけで、観察が既に発射を開始するかどうかを確認することです。そして、実際には、これは当初期待どおりに印刷されます。

  1. まず:10
  2. 第10
  3. まず:11
  4. まず:12
  5. まず:13
  6. まず:14
  7. 第11
  8. 最初:15
  9. まず:16
  10. まず:17

しかし、第2の消費者(1秒スリープによってシミュレート長い処理時間、を有するもの)は、配列(出力線7)にイベントをピックアップし、現在のイベントではなく、 14)私は熱い観測から期待しています。購読者とは無関係に放火を続けるだけでなく、加入者が(特定の明白な背圧戦略がないと仮定して)当面押し込まれたものを拾い上げるという熱い観察可能な考えはありませんか?

2人目の消費者が現時点で生産されたものを単に取り上げるためには、何を変更する必要がありますか(つまり、上記の例では11の代わりに14が表示されます)。

ご協力いただければ幸いです。

答えて

2

publishやobserveOnのような演算子は、購読してから継続しているため、処理が排出レートよりも長くなってもすべてのイベントを取得します。

後者の場合には古いエントリの処理を回避するには、あなたがイベントをドロップすると、あまりにも多くのいずれかのバッファリングしていないチェーン事業者にあります。

ob.toFlowable(BackpressureStrategy.DROP) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 

または

ob.toFlowable(BackpressureStrategy.LATEST) 
    .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread()) 
    .rebatchRequests(1) 
    .subscribe(withSleep); 
+0

おかげで、これはありませんトリック。しかし、なぜ遅延演算子が必要なのかは分かりません(それがなければ、もちろん動作しません)。私はそれがいくつかの必要な副作用(バッファ、スレッディング)のためだと思っていますが、少し操作して、私はこの演算子から期待するものではないようです。また、遅延演算子のスケジューラを削除しても同じ結果が得られると思われます。 –

+0

遅延はバックプレッシャ要求をバッファしたり干渉したりすることはないので、 'observeOn'のようなデータはプリフェッチされず、ストリームが実際に処理されている間に放出されたデータをスキップします。 – akarnokd