2016-11-17 4 views
1

インターバル演算子を使用していますが、私のパイプラインで例外が発生しても項目を放し続けたいと思います。間隔はonErrorResumeNextの後に続きます

したがって、私はonErrorResumeNext Exceptionの場合にアイテムを放出しようとします。しかし、私はこのアイテムを放出した後、間隔がより多くのアイテムを放出するのを見た。

ここで私のユニットテスト。

@Test 
public void testIntervalObservableWithError() { 
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .map(item -> item = null) 
      .map(String::toString) 
      .onErrorResumeNext(t-> Observable.just("item with error emitted")) 
      .subscribe(System.out::print, t->{ 
         System.out.println(t); 
        } 
        ); 
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription); 
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS); 
} 

- 私onErrorResumeNext

SOLUTIONからアイテムを受け取るit's場合は、この動作を、なぜ、観察退会と混同:いくつかの説明の後

が、私はそのときにエラー実現観察可能なtが完了します。だから私は別のobservableに例外を持つことができるobservableをラップしてしまい、私はflatMapを使っています。それで主Observableはアイテムを放出し続けます。

@Test 
public void testIntervalObservableWithError() { 
    Observable.interval(100, TimeUnit.MILLISECONDS) 
      .map(time -> "item\n") 
      .flatMap(item -> Observable.just(item) 
        .map(String::toString)) 
      .subscribe(System.out::print); 
    TestSubscriber testSubscriber = new TestSubscriber(); 
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS); 
} 

私が知りたいと思うすべての魔法をやり遂げることができる演算子があれば。

Regrads

答えて

1

onErrorResumeNextがトリガされたときに、あなたの上流に既にエラーで終了したので、あなたのサブスクリプションは中断します。そして、あなたは単に例外を下流に流すのではなく、アイテムを出すだけです。アップストリームを生き続けるためには、例外がスローされないようにする必要があります。あなたの特定の例のソリューションは、このようにすることができますについては

... 
    .map(time -> "item\n") 
    .map(item -> item = null) 
    .map(item -> { 
     try { 
      return item.toString(); 
     } catch (NullPointerException e) { 
      return "item with error emitted"; 
    }) 
    //no onErrorResumeNext() 
    .subscribe ... 

onErrorResumeNextはちょうど項目に誤りを置き換え、onCompleteを呼び出します。

+0

こんにちは、私のコード例をお読みください。私はonErrorResumeNextを使用しており、動作していません。 – paul

+0

私は最初あなたのコードを読んだ。あなたのケースでは 'null'アイテムで' .map(String :: toString) 'を呼び出すので、ストリームはすぐにエラーで終わり、このエラーは' onResumeErrorNext() 'によって捕捉されます。項目を送出し、ストリームは 'onComplete'を呼び出します。それはあなたの質問への答えです**なぜ観測可能な退会ですか?**。また、私は私の答えを更新しました。 –

+0

さらに更新されました。 –

1

RxJavaストリームによって使用される契約は、エラーが放出されると、これ以上の項目が放出されなければならないということです。ユースケースで、エラーの後にストリームが継続することが必要な場合は、エラーをonNextエミッションに変換する必要があります。ラッパー型を作成ValueOrError<T>を言うとObservable<ValueOrError<T>>の観点で考え始める:

Observable<Integer> source = ... 
Observable<ValueOrError<Integer>> o = 
    source.map(x -> { 
    try { 
     return new ValueOrError<>(mightThrow(x)); 
    } 
    catch (Throwable e) { 
     return new ValueOrError<>(e); 
    }); 
関連する問題