2017-06-22 18 views
1

与えられたファイルを扱うことができる、またはできない、すなわちある特定のパーサーがある確率で成功するか、p > 0か、常に失敗する(p=0)という脆弱な(時には失敗する)パーサーがあるとします。 RxJavaを使用して、この一連のパーサーを受信ファイルのストリームに登録し、ファイルを解析する 'race'にすることは可能ですか?RxJava 2を使用して再試行でレースをオーケストレーションするにはどうすればよいですか?

パーサーが最初に失敗する可能性があるが、引き続きファイルを解析できる可能性がある場合、いくつかのバックオフポリシーを使用して再試行する必要があります。パーサーが指定されたファイルを処理できない場合、再試行回数に上限を設定する必要があります。指数バックオフを実装

は、この(source)のようなものでretryWhenを使用して実装することは比較的容易である:

source.retryWhen(errors -> 
       errors.zipWith(Observable.range(1, 3), (n, i) -> i) 
         .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS)) 
); 

しかし、並列レースを設定すると、私が行う方法を見つけ出すことはできませんものです。 amb演算子が私たちがここで望んでいるようですが、任意の数のストリームに適用するには、blockingIterableを使用する必要があると思われます(これは、ブロックするようにレースの目的を破ると思います)。私はambのこのユースケースに関連する有用なものをインターネット上で見つけることができませんでした。

Set<Parser> parserSet = new HashSet<>(); 
parserSet.add(new Parser(..., ..., ...)); 
// Add more parsers 
int numParsers = parserSet.size(); 

Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat(); 

fileSource 
    .flatMap(f -> parsers.take(numParsers) 
         .map(p -> p.parse(f)) 
         .retryWhen(/* snippet from above */) 
         .onErrorReturn(/* some error value */) 
      ).take(1) 

FlowableRETRYメソッドを持っているだけで、最近ParallelFailureHandlingsee this pr)の追加を得た.parallel()オペレータを導入したが、私は取得するように見えることはできません。

私の試みは、これまでこのような何かに似ていますそれらのうちの1つが戻った後に、再試行を停止する。

この問題はRxJavaで解決できますか?あなたのパーサは同期しているという合理的な仮定を作る

+0

特にblockingIterableが必要であるとは言わないが、どうしてそれが必要だと思ったのですか? –

+0

'Flowable.amb()'は 'Iterable <? published Publisher > '、' blockingIterable'呼び出しでそれらのフォームを取得する方法がありませんでした。私は '.reduce(Flowable :: amb)'を使ってみましたが、どちらも動かすことができませんでした。 –

+0

'Observable.amb'や' ambWith'はどうですか?(自分でそれほど経験はありませんでした。 –

答えて

1

Set<Parser> parserSet = new HashSet<>(); 
parserSet.add(new Parser(..., ..., ...)); 
// Add more parsers 
int numParsers = parserSet.size(); 

ArrayList<Flowable<T>> parserObservableList = new ArrayList<>(); 

for (Parser p: parserSet) { 
    parserObservableList.add(Flowable.fromCallable(() -> p.parse(f)) 
            .retryWhen(/* Add your retry logic */) 
            .onErrorReturn(/* some error value */)); 
} 

Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */); 

のようなものは、あなたの要件を満たす必要があります。

関連する問題