与えられたファイルを扱うことができる、またはできない、すなわちある特定のパーサーがある確率で成功するか、p > 0
か、常に失敗する(p=0
)という脆弱な(時には失敗する)パーサーがあるとします。 RxJavaを使用して、この一連のパーサーを受信ファイルのストリームに登録し、ファイルを解析する 'race'にすることは可能ですか?RxJava 2を使用して再試行でレースをオーケストレーションするにはどうすればよいですか?
パーサーが最初に失敗する可能性があるが、引き続きファイルを解析できる可能性がある場合、いくつかのバックオフポリシーを使用して再試行する必要があります。パーサーが指定されたファイルを処理できない場合、再試行回数に上限を設定する必要があります。指数バックオフを実装
は、この(source)のようなものでretryWhen
を使用して実装することは比較的容易である:
source.retryWhen(errors ->
errors.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
しかし、並列レースを設定すると、私が行う方法を見つけ出すことはできませんものです。 amb
演算子が私たちがここで望んでいるようですが、任意の数のストリームに適用するには、blockingIterable
を使用する必要があると思われます(これは、ブロックするようにレースの目的を破ると思います)。私はamb
のこのユースケースに関連する有用なものをインターネット上で見つけることができませんでした。
Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers
int numParsers = parserSet.size();
Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();
fileSource
.flatMap(f -> parsers.take(numParsers)
.map(p -> p.parse(f))
.retryWhen(/* snippet from above */)
.onErrorReturn(/* some error value */)
).take(1)
Flowable
がRETRY
メソッドを持っているだけで、最近ParallelFailureHandling
(see this pr)の追加を得た.parallel()
オペレータを導入したが、私は取得するように見えることはできません。
私の試みは、これまでこのような何かに似ていますそれらのうちの1つが戻った後に、再試行を停止する。
この問題はRxJavaで解決できますか?あなたのパーサは同期しているという合理的な仮定を作る
特にblockingIterableが必要であるとは言わないが、どうしてそれが必要だと思ったのですか? –
'Flowable.amb()'は 'Iterable <? published Publisher extends T>> '、' blockingIterable'呼び出しでそれらのフォームを取得する方法がありませんでした。私は '.reduce(Flowable :: amb)'を使ってみましたが、どちらも動かすことができませんでした。 –
'Observable.amb'や' ambWith'はどうですか?(自分でそれほど経験はありませんでした。 –