2017-03-12 3 views
0

私のオブザーバはオーバープロデュースであり、すべての要素を処理する必要があります。私が最初に購読するとき、私はできるだけ早く要素を受け取りたいが、 まで他の要素をバッファしたい。最初の要素が処理される と b)1秒のタイムアウト。RxJavaが最初の要素を出して他をバッファする

私の現在の実装は次のようになります。

connectionService.subscribe(request) 
.buffer(1, TimeUnit.SECONDS) 
.flatMap(merger) 
.subscribe(...) 

"合併は" 作業を行います。これで2つの問題があります。

1)はマージが1秒以上かかる場合、私は時に次の要素を受け取る)最初の要素があまりにも1秒の遅延を持つことになりますが、それはすぐに

2利用可能であるべきです

connectionService.subscribe(request) 
.onBackpressureBuffer(...) 
.flatMap(merger, 1) 
.subscribe(...) 

何が起こるかを指定するためにonBackpressureBufferオペレータのパラメータを使用します。以前は(最初の要素をマージするとき、それは唯一の問題だということに注意してください)

+0

これは正しいですか:要素を生成する観測可能性があります。各要素は次々に処理されなければなりません。 1つの要素の「処理」が1秒よりも長くかかる場合、次の要素を取得して処理することができますか? 1秒未満の場合は、次のプロセスを実行して処理します。それは正しいのでしょうか? –

+0

@ HansWurst各要素は前の要素に依存します(jsonパッチ文字列http://jsonpatch.com/)。すべての要素を順番に処理しなければなりません。最初の要素は最初のjsonであるため、別々に計算する必要がありますが、次の要素(jsonパッチ要素)を組み合わせることができます。 – adam0404

+1

さて、最初のemmited要素はinitです。 json。以下はinitに適用されるパッチです。 json。あなたはreduce/scanを使って放出されたすべてのパッチを最初のjsonに適用し、最後にすべてのパッチが適用されたjsonを得るでしょう。しかし、プロデューサがパッチを適用するよりも速く生成する可能性があります。このためには、rxjavaのバックプレッシャを使用します。それは正しいか? –

答えて

0

使用この(RxJava 1を想定)のようなものを扱っていませんそれがオーバーフローするとき。

.flatMap(..., 1)は、正確に1つの項目が上流から要求されるようにします。

関連する問題