2017-02-03 9 views
1

RxJavaを使用して、私のdbと同期するバックグラウンドジョブを作成しています。
外部ソースに接続し、エントリの処理、マッピング、およびdbへの挿入を開始します。
終了時には、すべての要素が処理されたリストが必要ですが、すべてがうまくいくと取得できますが、フロー中に何らかの処理が失敗した場合、すべての要素をどのように収集できますか?私が持っていると思い何ReactiveX障害の前に処理された要素を収集します。

final List<String> res = Observable.create(onSubscribe) 
     .buffer(4) 
     .flatMap(TestRx::doStuff) 
     .buffer(8) 
     .map(TestRx::calculateList) 
     .toList() 
     .toBlocking() 
     .single(); 
System.out.println("strings = " + res); 

doStuffまたはcalculateList例外をスローした場合、フローは停止し、それがエラーになるまで処理されたすべてのもののリストを返すことの方法です。エラー履歴書上の

答えて

2
List<String> res = Observable.create(onSubscribe) 
    .buffer(4) 
    .flatMap(TestRx::doStuff) 
    .onErrorResumeNext(Observable.empty()) // turn error into completion 
    .buffer(8) 
    .map(TestRx::calculateList) 
    .onErrorResumeNext(Observable.empty()) // turn error into completion 
    .toList() 
    .toBlocking() 
    .single(); 
System.out.println("strings = " + res); 
+0

は、私はオリジナルの流れがエラーで停止@rascioそれは誤り – rascio

+1

に停止する必要があり、フローを実行し続け、処理はあなたが 'empty'流れである' onErrorResumeNext'、と提供新しい流れに続きますつまり、すぐにフローを完了します。 –

関連する問題