2017-02-06 4 views
2

私はRxJavaの方が新しく、他の多くの人たちも例外処理の頭を奪おうとしています。私はかなりの数の記事をオンラインで読むことができます(ここでの説明はhow to handle exceptions thrown by observer's onNextです)。と思っています。上記の議論ではRxJava 2.0 - publish()のキャッチされていないサブスクライバエラーのressourcesを処理します。refCount()

、ポスターの一つは、例外が加入者にスローされたとき、RxJavaは以下のないことを、こう述べています。失敗をログに記録し、それをイベントの送信を停止するように、一般的な処理を実装

(その種類の)他のサブスクリプションと一緒に を運んでください。

これは多かれ少なかれ私が見るものですが、私が問題を抱えているのは「すべてのリソースをきれいにする」ということだけです。

受信メッセージごとに非同期イベントソース(JMSキューなど)とonNext()をリッスンするObservableを作成したいとします。私は多くの加入者/オブザーバーのためのメッセージリスナーを再利用したいので、私が直接作成した観察可能で購読するが、使用しません

Observable<String> observable = Observable.create(s -> { 
    createConnectionToBroker(); 
    getConsumer().setMessageListener(message -> s.onNext(transform(message))); 
    s.setDisposable(new Disposable() { 
    public void dispose() { 
     tearDownBrokerConnection(); 
    } 
    }); 
}); 

:だから、(擬似)コードの中で、私は次のように何かをするだろう代わりにpublish().refCount()チーム。これに似た何か:

Observable<String> observableToSubscribeTo = observable.publish().refCount(); 

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...); 
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...); 

このすべての作品予想通り。コードは、最初のサブスクリプションが確立されたときにのみJMSに接続し、最後のオブザーバがdispose()のときにブローカへの接続が閉じられます。

ただし、サブスクライバがonNext()であるときに例外をスローすると、ものが乱雑になるようです。予想どおり、放映されたオブザーバーは裸であり、新しいイベントが公開されるたびに通知されなくなります。私の問題は、残りのサブスクライバがすべてdispose() dであるときに、メッセージブローカへの接続を維持するObservableが通知されなくなったことを示しています。あたかも例外を投げた加入者がある種のゾンビ状態にあるかのように見えます。イベント配信に関しては無視されますが、最後のサブスクライバがdispose() dであるときにObservableルートが何らかの理由で通知されないようにします。

RxJavaは、オブザーバーが最終的な例外をスローしないで、むしろ正しく処理することを確実にすると理解しています。残念ながら、Observableを呼び出し元に返すライブラリを提供したい場合、私は自分の購読者を何も制御できません。つまり、私は決して愚かな観察者から私の図書館を守ることはできません。

だから私は自分自身に尋ねています:私はここに何かを逃していますか?加入者がスローしたときに適切にクリーンアップする機会はありませんか?これはバグですか、それとも私がライブラリを理解していないのですか?

洞察力は大変高く評価されています。

答えて

1

(JMSを必要とせずに)問題があることを示す単体テストを示すことができたら、それは素晴らしいものです。

また、RxJava 2のonNextはスローしないでください。そうであれば、未定義の振る舞いです。、

.compose(o -> v -> o.safeSubscribe(v)) 

または

.compose(new ObservableTransformer<T>() { 
    @Override public Observable<T> apply(final Observable<T> source) { 
     return new Observable<T>() { 
      @Override public void subscribeActual(Observer<? super T> observer) { 
       source.safeSubscribe(observer); 
      } 
     }; 
    } 
}) 
+1

を@akarnokdこんにちは:あなたの消費者を信頼していない場合は、下流の不正な動作に対する保護追加代わりに、プレーンsubscribesafeSubscribeを行い、エンド観察可能な変圧器を持つことができます これをお探しいただきありがとうございます。私はちょうど私がもはやそれを再現することができないことを学ぶために、行動を示すために基本的な単体テストを書きましたか? :-oかなり恥ずかしい、私は言う必要があります:-(私は問題があなたの時間を無駄にするための謝罪!!!!!!!!!!!!!!!!!!! そしてsafeSubscribe()に関するコメントのおかげで。あなたがコードを使用することを信じることはできませんが、私は本当にそれを固めたいと思います.1つの悪意のある加入者は、他のよく振る舞うものに副作用を持つべきではありません。 –

関連する問題