2017-05-16 11 views
0

何千ものアイテムを放出するブロッキング長時間観測で放出されたばかりのアイテムを知りたい。以下のコードは動作しますが、range()からの大きなバッファが作成されます。RxJava:放出中に放出された元素をカウントする

外部カウンタフィールドを導入せずにこの動作を回避する方法はありますか?

答えて

3

バッファはObservable.rangeに起因します。これは、sourceObservableよりも速く生成される可能性があります。これは、sourceObservableから正しいものをzipするためにすべての値をバッファリングする必要があります。

私の実装を見ていてください:

@Test 
void stackoverflow44004014() { 
    Observable.just("i", "b", "c") 
      .scan(0, (counter, sourceValue) -> { 
       return ++counter; 
      }) 
      .skip(1) 
      .test() 
      .assertResult(1, 2, 3); 
} 
+0

私が探していたました。ありがとうございました。カウンタを変更してソース値を保持するペアにする必要があり、ソース値をカウンタとともにストリームにさらに伝播できるようにする必要があります。 – Mariusz

関連する問題