RxJavaでWebサービスコールを最適化しようとしていますが、応答に時間がかかりすぎることはありません。そのために は、私はこのようなセレクタを閉じるようdebounce()
でbuffer(closingSelector)
演算子を使用しています:Rxリミット+タイムアウトを伴う高速生成ストリームのバッファリング
Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */);
それはrequestStream
が速すぎて生成する場合、それはWSが一度に処理するには大きすぎる巨大なバッチを放出することを除いて、正常に動作し、何とかバッチサイズを制限したいと思います。 したがって、バッファにX項目があるか、アップストリームから最後に到着してからYが経過した場合に閉じるイベントを発行するclosingSelector
が必要です。
Operator
というカスタムを実装する以外の良い解決策は見つからないようですが、OperatorDebounceWithTime
に似ていますが、最後のバッファではなくバッファ内のすべての要素を返す内部バッファがあります。
これを実現する簡単な方法はありますか?いくつかの操作を組み合わせることによって?
編集:私は、コード片が上記別の問題があることに気づい
質問を投稿した後:要求がデバウンスタイムアウトよりも継続的に速く流れる場合(requestStream
windowSize
よりも早く生産しているが)、その後burstyDeBounced
は何も放出しません受信ストリームに十分長いポーズがあるまで、すべての要求がバッファされます。
最後にこの問題の良い解決策を見つけましたか?もしあなたがそれを投稿することができますか?乾杯。 – cerisier