2016-12-22 18 views
3

私はRxJavaを使用していますが、別の実装からの回答を翻訳できるかもしれません。Rxバッファをフラッシュする方法は?

私はアイテムのシーケンスを放射観測を持っている、と私はこのような10何かのグループにバッチにそれらをしたい:

observable 
    .buffer(10) 
    .map(items -> { 
     StringBuilder sb = new StringBuilder(); 
     for (String item : items) 
      sb.append(item.document); 
     return sb.toString(); 
    }) 
    .subscribe(...) 
素晴らしい作品

。しかし、私は不完全なバッファをフラッシュする必要があります。今のところ、9つの文字列を出した後に私の観測が完了すれば、私の加入者は決して呼び出されません。

組み込み演算子でこれを行う方法はありますか、またはonCompleteを見ながらバッファリングを行うカスタムオブザーバを作成する必要はありますか?

ありがとうございます!

更新:さらに調査した後、問題は、私はテストの下でうまく働いたいくつかのカスタム観測があったが、一緒に配線したときに破ったところ、上流であることが判明。なぜ周り

Observable.never() 
    .mergeWith(Observable.range(1,8)) 
    .buffer(5) 

私はかなり得ていない私の頭を、それnever()は処理取得からの部分的バッチを防ぐ:私は基本的にこれをやりました。しかし、これはうまくいきます。

Observable.empty() 
    .mergeWith(Observable.range(1,8)) 
    .buffer(5) 

私は勉強します。助けてくれてありがとう!

+0

バッファーご利用の場合にはうまく機能、ここhttps://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java

はまたWindowオペレータに見て、バッファのより多くの例を見ることができます完了します。どのようにこのシーケンスを観察し、あなたのソースは何ですか? – akarnokd

答えて

1

バッファから最後のアイテムを放出していない理由はわかりません。 バッファはアイテムのグループを放出しますが、すべてのアイテムを放出します。

は、最後の番号10は、グループ内の唯一の要素である出射さit's場合でも

@Test 
public void stringBuffer() { 
    Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; 
    Observable.from(numbers) 
      .map(number -> "uniqueKey=" + number) 
      .buffer(5) 
      .map(ns -> String.join("&", ns)) 
      .subscribe(System.out::println); 

} 

この例をチェックしてください。

あなたは多分前にも最終的に不完全なリストを放出しなければならない

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableWindow.java

+0

あなたは正しいです...さらなる調査の後、私の観察可能なチェーンをさらに上げることによって問題が判明しました。質問の根本的な原因をより詳細に説明しました。ありがとう!しかし、あなたの例は部分的なバッチを生成しません。例えば8になるように配列を変更するほうが良い例です(最後のバッチは3つのアイテムしか持たないでしょう)。 – starkos

関連する問題