2017-07-11 4 views
3

combineLatest演算子で得られた組み合わせから常に最後の値を取得することに問題があります。Flowable/Observable Bufferを無効にする

私は高周波で2つのホットフロアブル剤(A、B)を生成イベントを持っている(イベントごとに100ミリ秒ごと):それは仕事だ行うには、ほぼ300ミリ秒を要しcombineLatest、と組み合わせる

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value); 
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value); 

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,  OrderBookCouple::new).observeOn(Schedulers.newThread()); 
combined.subscribe((bookCouple) -> { 
       System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp()); 
       System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp()); 
       Thread.sleep(300); 
      } 

コンバイナの一つを実行した後、私はプロセスは非常に最後の組み合わせ生成されたイベントの、意味(lastA、lastB)したいと思います。

組み合わせフローのデフォルトの動作は、すべての組み合わせのイベントを独自のバッファにキャッシュして、組み合わせフローが非常に古い組み合わせを受信し、この時間ギャップが爆発的になるようにすることです。

このバッファを無効にし、常に最後の組み合わせを受け取るようにコードを変更する必要がありますか?

答えて

4

flowAflowBの両方に適用でき、overload of combineLatestを使用してプリフェッチ量を指定できます。

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()), 
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]), 
    1 
) 
.onBackpressureLatest() 
.observeOn(Schedulers.newThread(), false, 1) 

残念ながら、このようにあなたは、配列の要素をキャストに戻す必要がありBiFunctionとbufferSizeの両方を取る過負荷はありません。

編集

onBackpressureLatestを適用し、combineLatestは、このユースケースを目的としていないが、近い所望のパターンにあなたを取得する必要がありますobserveOnにバッファサイズを制限します。おそらく、いくつかのマルチサンプル演算子が必要でした。

+0

ここでは単純化した例であなたのソリューションを試しましたが、まだバッファリングがあるようです。コードはここにあります:[link](https://gist.github.com/Ambros94/06d50869eec77a5758c30c4ed66ab101) –

+2

私の答えが更新されました。 – akarnokd

+0

これは動作していますので、それに応じて要点を更新しました。ありがとうございました。 –

関連する問題