2017-02-16 4 views
0

PublishProcessorがあり、特定のチェックを実行するには.doOnSubscribe(checkCondition)演算子が呼び出されます。 checkConditionは、UnsupportedOperationExceptionをスローすることを意図しており、その後、ストリームをサブスクライバにonError()メソッドまで伝搬する必要があります。代わりにUndeliverableExceptionがスローされ、プロセスがクラッシュします。doOnSubscribe()からスローされたときに例外がonError()に伝播されない

publishProcessor 
    .filter(() -> { // predicate }) 
    .observeOn(scheduler) 
    .doOnSubscribe(checkCondition) 
    .to((sourceFlowable) -> new FancyFlowable(sourceFlowable))) 
    .safeSubscribe(subscriber); 

ここで何が問題になっているのでしょうか? checkConditionから投げられた例外がサブスクライバのonErrorに伝播されないのはなぜですか?

+0

観察された行動は、バグによるものです。 [#5103](https://github.com/ReactiveX/RxJava/pull/5103)で修正してください。 – akarnokd

答えて

0

おそらくそれは意味をなさないためです。 observableチェーンが適切に初期化される前にdoOnSubscribeが実行されます。このようなことを考えましたか?

if(checkCondition) { 
    Observable.error(...).safeSubscribe(subscriber); 
} 

その後、元のコード(sans doOnSubscribe)を続けます。

+0

Observableが準備された後、加入者が加入する前に条件が変わる可能性があるため、条件を前もって調べても問題は解決しません。 –

+1

うわー、私は知っている! Observable.defer(() - > checkCondition?error(...):publishProcessor) 'を呼び出し、正常にチェーンします。 –

1

これはRxJava 2のdoOnSubscribeオペレータのバグです。 onSubscribeコールバックがクラッシュした場合、その時点のThrowableonErrorで合法的に通知されます。私はすぐに修正を掲載する予定です。

は今のところ、this commentはあなたのための回避策ことがあります

Observable.defer(() -> checkCondition ? error(...) : publishProcessor)

関連する問題