2017-09-30 20 views
0

問題を再現するサンプルプログラムがあります。Observable.flatMapを使用している場合のエラー処理

問題:

  1. は、前述のObservableに購読一部Observable
  2. flatMap変換を適用Observable前に、サブスクリプション、前述のサブスクリプションのどこか
  3. 処分を保存するには、自然に
  4. 終了しますマッパー関数によって返されたObservableは、Exception
  5. FlatmapのオペレータがException処理する方法を知らない、/それ、プログラムがクラッシュを発生させず終了し

優先/期待される動作:エラーがクラッシュするのではなく、私のonErrorハンドラに伝播する必要があります

  • RxJavaPlugins#onErrorが呼び出されたときのプログラム

ObservableFlatMapにあるコードの抜粋です。問題は、親が一旦処分されると、addThrowableへの呼び出しがfalseを返すということです。したがって、エラーは決してonErrorに伝播しません。

@Override 
public void onError(Throwable t) { 
    if (parent.errors.addThrowable(t)) { 
     if (!parent.delayErrors) { 
      parent.disposeAll(); 
     } 
     done = true; 
     parent.drain(); 
    } else { 
     RxJavaPlugins.onError(t); 
    } 
} 

この場合、私は何ができますか?私はflatMapのように動作し、私のプログラムをクラッシュさせる代わりにエラーを私のonErrorハンドラに伝播する演算子が必要です。

これは、Androidアプリが実際のシナリオです。サブスクリプションは、ユーザーがウィンドウ/アクティビティを終了すると自動的に破棄され、InterruptedIOException秒のために処理後に例外が発生することがあります。

コードは、問題がflatMap前に、あなたの.subscribeOn(Schedulers.computation())である問題

import io.reactivex.Observable; 
import io.reactivex.disposables.Disposable; 
import io.reactivex.plugins.RxJavaPlugins; 
import io.reactivex.schedulers.Schedulers; 

public class Main { 

    public static void main(String[] args) throws InterruptedException { 
    RxJavaPlugins.setErrorHandler((throwable)->{ 
     System.out.println("Please don't come through here"); 
     throwable.printStackTrace(); 
    }); 
    Disposable disposable = Observable.just(1) 
     .subscribeOn(Schedulers.computation()) 
     .flatMap((item)->{ 
      return Observable.just(1) 
       .doOnNext((arg)->Thread.sleep(1000)) 
       .doOnNext((arg)->{ 
       throw new RuntimeException("Error"); 
       }); 
     }) 
     .subscribe(System.out::println, (throwable)->{ 
      System.out.println("Please come through here"); 
      throwable.printStackTrace(); 
     }); 
    Thread.sleep(500); 
    disposable.dispose(); 
    Thread.sleep(1000); 
    } 

} 

実行出力

Please don't come through here 
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted 
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349) 
    at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99) 
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248) 
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) 
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58) 
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248) 
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.InterruptedException: sleep interrupted 
    at java.lang.Thread.sleep(Native Method) 
    at Main.lambda$null$1(Main.java:18) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95) 
    ... 22 more 

期待/優先出力

Please come through here 
java.lang.RuntimeException: Error 
    at Main.lambda$null$2(Main.java:19) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103) 
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248) 
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) 
    at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58) 
    at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248) 
    at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35) 
    at io.reactivex.Observable.subscribe(Observable.java:10903) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

答えて

1

を複製します。 disposeあなたは、完全なサブスクリプションを壊すフラットマップオブザーバブルを生成する中断スレッドを使用します。これを修正するには、サブスクリプションをflatMapの後または内に指定するか、別のスレッドでそれを観察する必要があります。

の作業例:

RxJavaPlugins.setErrorHandler((throwable) -> { 
     System.out.println("Please don't come through here"); 
     throwable.printStackTrace(); 
    }); 
    Disposable disposable = Observable.just(1) 
      .flatMap((item) -> { 
       return Observable.just(1) 

         .doOnNext((arg) -> Thread.sleep(1000)) 
         .doOnNext((arg) -> { 
          throw new IllegalStateException("Error"); 
         }) 
         .subscribeOn(Schedulers.computation()); 
      }) 
      .subscribe(System.out::println, 
        (throwable) -> { 
         System.out.println("Please come through here"); 
         throwable.printStackTrace(); 
        }); 
    Thread.sleep(500); 
    disposable.dispose(); 
    Thread.sleep(1000); 
+0

ねえ返事が遅れについて申し訳ありませんが、私はユニ今週で忙しかったです。私はかなりうまく混乱しています。なぜなら、これは少しうまくデバッグし始めたからです。私のサンプルソース 'Observable'が' ScalarCallable'であるため、ObservableFlatMapを使って完全にスキップしています。私は最適化のために、Observable#flatMapはObservableFlatMapの代わりにObservableScalarXMap#scalarXMapを使います。 'Observable.just(1)'を 'Observable.just(1,2)'に変更すると、エラーが発生します。 –

+0

私は問題がObservable#flatMapの最適化ではなく、あなたが購読しているオブザーバのスレッドである前に悲しいです。プログラムが実行されているメインスレッドを中断するコードです。 > Observable.just(1、2) \t \t \t \t \t \t .doOnNext((引数) - - >のThread.sleep(1000)) \t \t \t \t \t '.flatMap((アイテム):この1を確認してください\t .doOnNext((引数) - > { \t \t \t \t \t \t \t投新規のRuntimeException( "エラー"); \t \t \t \t \t \t})observeOn(SC。 hedulers.computation())) 'それはうまくいくはずです – DDH

+0

"それはうまくいくはずです "と言ったらどういう意味ですか?私はあなたのコードを差し込みました。そして、はい、それは 'ここに来てください。 'というメッセージを出力しますが、それはスレッドが中断されることはなく、' java.lang.RuntimeException:Error'がObservableによってスローされるからです。同じページにいるので、私は[ここ](https://pastebin.com/raw/BG3hWZPK)で実行したコードをアップロードしました。 –

関連する問題