私はRxJavaを使用していますが、別の実装からの回答を翻訳できるかもしれません。Rxバッファをフラッシュする方法は?
私はアイテムのシーケンスを放射観測を持っている、と私はこのような10何かのグループにバッチにそれらをしたい:
observable
.buffer(10)
.map(items -> {
StringBuilder sb = new StringBuilder();
for (String item : items)
sb.append(item.document);
return sb.toString();
})
.subscribe(...)
素晴らしい作品
。しかし、私は不完全なバッファをフラッシュする必要があります。今のところ、9つの文字列を出した後に私の観測が完了すれば、私の加入者は決して呼び出されません。
組み込み演算子でこれを行う方法はありますか、またはonComplete
を見ながらバッファリングを行うカスタムオブザーバを作成する必要はありますか?
ありがとうございます!
更新:さらに調査した後、問題は、私はテストの下でうまく働いたいくつかのカスタム観測があったが、一緒に配線したときに破ったところ、上流であることが判明。なぜ周り
Observable.never()
.mergeWith(Observable.range(1,8))
.buffer(5)
私はかなり得ていない私の頭を、それnever()
は処理取得からの部分的バッチを防ぐ:私は基本的にこれをやりました。しかし、これはうまくいきます。
Observable.empty()
.mergeWith(Observable.range(1,8))
.buffer(5)
私は勉強します。助けてくれてありがとう!
バッファーご利用の場合にはうまく機能、ここhttps://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java
はまた
Window
オペレータに見て、バッファのより多くの例を見ることができます完了します。どのようにこのシーケンスを観察し、あなたのソースは何ですか? – akarnokd