0
私のオブザーバはオーバープロデュースであり、すべての要素を処理する必要があります。私が最初に購読するとき、私はできるだけ早く要素を受け取りたいが、 まで他の要素をバッファしたい。最初の要素が処理される と b)1秒のタイムアウト。RxJavaが最初の要素を出して他をバッファする
私の現在の実装は次のようになります。
connectionService.subscribe(request)
.buffer(1, TimeUnit.SECONDS)
.flatMap(merger)
.subscribe(...)
"合併は" 作業を行います。これで2つの問題があります。
1)はマージが1秒以上かかる場合、私は時に次の要素を受け取る)最初の要素があまりにも1秒の遅延を持つことになりますが、それはすぐに
2利用可能であるべきです
connectionService.subscribe(request)
.onBackpressureBuffer(...)
.flatMap(merger, 1)
.subscribe(...)
何が起こるかを指定するためにonBackpressureBuffer
オペレータのパラメータを使用します。以前は(最初の要素をマージするとき、それは唯一の問題だということに注意してください)
これは正しいですか:要素を生成する観測可能性があります。各要素は次々に処理されなければなりません。 1つの要素の「処理」が1秒よりも長くかかる場合、次の要素を取得して処理することができますか? 1秒未満の場合は、次のプロセスを実行して処理します。それは正しいのでしょうか? –
@ HansWurst各要素は前の要素に依存します(jsonパッチ文字列http://jsonpatch.com/)。すべての要素を順番に処理しなければなりません。最初の要素は最初のjsonであるため、別々に計算する必要がありますが、次の要素(jsonパッチ要素)を組み合わせることができます。 – adam0404
さて、最初のemmited要素はinitです。 json。以下はinitに適用されるパッチです。 json。あなたはreduce/scanを使って放出されたすべてのパッチを最初のjsonに適用し、最後にすべてのパッチが適用されたjsonを得るでしょう。しかし、プロデューサがパッチを適用するよりも速く生成する可能性があります。このためには、rxjavaのバックプレッシャを使用します。それは正しいか? –