2017-07-10 1 views
2

subscribeOnflatMapを組み合わせるとUndeliverableExceptionが表示されます。この最小限の例では、この問題を示していますrxJava 2スレッドプールとflatMapでの配信不能

@Test 
public void flatMapException() throws ExecutionException, InterruptedException { 
    SettableFuture<Boolean> f = SettableFuture.create(); 

    Observable.just(1,2).flatMap(x -> { 
     return Observable.just(1).flatMap(z -> { 
      if (z == 1) return Observable.error(new IOException("haha")); 
      return Observable.just(1); 
     }).subscribeOn(Schedulers.computation()); 
    }).onErrorReturnItem(1).subscribeOn(Schedulers.computation()).subscribe(
     x -> {}, 
     e -> { 
      f.set(true); 
     },() -> { 
      f.set(true); 
     }); 

    assertEquals(true, f.get()); 
} 

私は最初のエラーの後に観察可能で実行が停止し、登録解除を解雇されていることを期待しました。

最初の正常終了後にflatMapで返された2番目のObservableを購読したいと思います。

RxJavaで私の意図を表現するにはどうすればよいですか?

+3

'subscribeOn'は、2つのサブスクリプションが非同期になりので、あなたが観察できる第一および第二のエラーからのレースを持っています。両方のエラーを収集するには、外側の 'flatMap'の' delayErrors'パラメータを使用することができます。 – akarnokd

答えて

1

これは動作するようです:

Observable.just(1, 2, 3) 
    .map(x -> Observable.error(new Exception("e" + x))) 
    .compose(xs -> Observable.concat(xs)); 
関連する問題