2016-09-09 14 views
1

RxJavaでWebサービスコールを最適化しようとしていますが、応答に時間がかかりすぎることはありません。そのために は、私はこのようなセレクタを閉じるようdebounce()buffer(closingSelector)演算子を使用しています:Rxリミット+タイムアウトを伴う高速生成ストリームのバッファリング

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share(); 
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit); 
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */); 

それはrequestStreamが速すぎて生成する場合、それはWSが一度に処理するには大きすぎる巨大なバッチを放出することを除いて、正常に動作し、何とかバッチサイズを制限したいと思います。 したがって、バッファにX項目があるか、アップストリームから最後に到着してからYが経過した場合に閉じるイベントを発行するclosingSelectorが必要です。

Operatorというカスタムを実装する以外の良い解決策は見つからないようですが、OperatorDebounceWithTimeに似ていますが、最後のバッファではなくバッファ内のすべての要素を返す内部バッファがあります。

これを実現する簡単な方法はありますか?いくつかの操作を組み合わせることによって?

編集:私は、コード片が上記別の問題があることに気づい

質問を投稿した後:要求がデバウンスタイムアウトよりも継続的に速く流れる場合(requestStreamwindowSizeよりも早く生産しているが)、その後burstyDeBouncedは何も放出しません受信ストリームに十分長いポーズがあるまで、すべての要求がバッファされます。

+0

最後にこの問題の良い解決策を見つけましたか?もしあなたがそれを投稿することができますか?乾杯。 – cerisier

答えて

0

あなたは小さなものにデバウンスソースの大容量バッファを分割することができます:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share(); 
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast 
    .debounce(windowSize, windowUnit); 

burstyMulticast.buffer(burstyDeBounced) 
.onBackpressureBuffer() 
.concatMapIterable(list -> Lists.partition(list, windowSizeLimit)) 
.subscribe(...); 

Lists.partition

をGoogleグアバからです。

+0

あなたのご意見ありがとうございます。残念ながら私は上記の元のコード部分に別の問題があることを認識しました。したがって、私はdebounceを使用することはできません。 –

関連する問題