問題を再現するサンプルプログラムがあります。Observable.flatMapを使用している場合のエラー処理
は問題:
- は、前述の
Observable
に購読一部Observable
- に
flatMap
変換を適用Observable
前に、サブスクリプション、前述のサブスクリプションのどこか - 処分を保存するには、自然に
- 終了しますマッパー関数によって返された
Observable
は、Exception
- Flatmapのオペレータが
Exception
処理する方法を知らない、/それ、プログラムがクラッシュを発生させず終了し
優先/期待される動作:エラーがクラッシュするのではなく、私のonError
ハンドラに伝播する必要があります
RxJavaPlugins#onError
が呼び出されたときのプログラム
ObservableFlatMap
にあるコードの抜粋です。問題は、親が一旦処分されると、addThrowable
への呼び出しがfalseを返すということです。したがって、エラーは決してonError
に伝播しません。
@Override
public void onError(Throwable t) {
if (parent.errors.addThrowable(t)) {
if (!parent.delayErrors) {
parent.disposeAll();
}
done = true;
parent.drain();
} else {
RxJavaPlugins.onError(t);
}
}
この場合、私は何ができますか?私はflatMap
のように動作し、私のプログラムをクラッシュさせる代わりにエラーを私のonError
ハンドラに伝播する演算子が必要です。
これは、Androidアプリが実際のシナリオです。サブスクリプションは、ユーザーがウィンドウ/アクティビティを終了すると自動的に破棄され、InterruptedIOException
秒のために処理後に例外が発生することがあります。
コードは、問題がflatMap
前に、あなたの.subscribeOn(Schedulers.computation())
である問題
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class Main {
public static void main(String[] args) throws InterruptedException {
RxJavaPlugins.setErrorHandler((throwable)->{
System.out.println("Please don't come through here");
throwable.printStackTrace();
});
Disposable disposable = Observable.just(1)
.subscribeOn(Schedulers.computation())
.flatMap((item)->{
return Observable.just(1)
.doOnNext((arg)->Thread.sleep(1000))
.doOnNext((arg)->{
throw new RuntimeException("Error");
});
})
.subscribe(System.out::println, (throwable)->{
System.out.println("Please come through here");
throwable.printStackTrace();
});
Thread.sleep(500);
disposable.dispose();
Thread.sleep(1000);
}
}
実行出力
Please don't come through here
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at Main.lambda$null$1(Main.java:18)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
... 22 more
期待/優先出力
Please come through here
java.lang.RuntimeException: Error
at Main.lambda$null$2(Main.java:19)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
ねえ返事が遅れについて申し訳ありませんが、私はユニ今週で忙しかったです。私はかなりうまく混乱しています。なぜなら、これは少しうまくデバッグし始めたからです。私のサンプルソース 'Observable'が' ScalarCallable'であるため、ObservableFlatMapを使って完全にスキップしています。私は最適化のために、Observable#flatMapはObservableFlatMapの代わりにObservableScalarXMap#scalarXMapを使います。 'Observable.just(1)'を 'Observable.just(1,2)'に変更すると、エラーが発生します。 –
私は問題がObservable#flatMapの最適化ではなく、あなたが購読しているオブザーバのスレッドである前に悲しいです。プログラムが実行されているメインスレッドを中断するコードです。 > Observable.just(1、2) \t \t \t \t \t \t .doOnNext((引数) - - >のThread.sleep(1000)) \t \t \t \t \t '.flatMap((アイテム):この1を確認してください\t .doOnNext((引数) - > { \t \t \t \t \t \t \t投新規のRuntimeException( "エラー"); \t \t \t \t \t \t})observeOn(SC。 hedulers.computation())) 'それはうまくいくはずです – DDH
"それはうまくいくはずです "と言ったらどういう意味ですか?私はあなたのコードを差し込みました。そして、はい、それは 'ここに来てください。 'というメッセージを出力しますが、それはスレッドが中断されることはなく、' java.lang.RuntimeException:Error'がObservableによってスローされるからです。同じページにいるので、私は[ここ](https://pastebin.com/raw/BG3hWZPK)で実行したコードをアップロードしました。 –