加入者がイベントを消費し始めたときと終了したときをトレースしたいと思います。 Observable/Subscribersすべてに共通の方法はありますか?RxJava加入者のイベントを消費するのを追跡できますか?
1
A
答えて
6
1.xまたは2.x用ですか? 2.xでは、内部プロトコルがすべて誤ってフローを最適化しないとみなされなければならないので、かなり複雑になる可能性があります。
import io.reactivex.Observer;
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
if (!observable.getClass().getName().toLowerCase().contains("map")) {
return observer;
}
System.out.println("Started");
class SignalTracker implements Observer<Object>, Disposable {
Disposable upstream;
@Override public void onSubscribe(Disposable d) {
upstream = d;
// write the code here that has to react to establishing the subscription
observer.onSubscribe(this);
}
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onComplete() {
// handle onComplete
System.out.println("Completed");
observer.onComplete();
}
@Override public void dispose() {
// handle dispose
upstream.dispose();
}
@Override public boolean isDisposed() {
return upstream.isDisposed();
}
}
return new SignalTracker();
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
プリント:
Started
3
6
9
12
15
Completed
編集: RxJava 1バージョン
そうでなければ、それは本当のObserver
とオペレータとの間でシムれるObserver
を書くのと同じくらい簡単ですもう少しラムダが必要ですが、実行可能:
RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> {
if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) {
return onSubscribe;
}
System.out.println("Started");
return (Observable.OnSubscribe<Object>)observer -> {
class SignalTracker extends Subscriber<Object> {
@Override public void onNext(Object o) {
// handle onNext before or aftern notifying the downstream
observer.onNext(o);
}
@Override public void onError(Throwable t) {
// handle onError
observer.onError(t);
}
@Override public void onCompleted() {
// handle onComplete
System.out.println("Completed");
observer.onCompleted();
}
@Override public void setProducer(Producer p) {
observer.setProducer(p);
}
}
SignalTracker t = new SignalTracker()
observer.add(t);
onSubscribe.call(t);
};
});
Observable<Integer> observable = Observable.range(1, 5)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 3;
});
observable.subscribe(System.out::println);
Thread.sleep(6000L);
関連する問題
- 1. RxJava:ホット観測の消費者
- 2. 複数の消費者が同時に消費できますか?
- 3. Kafka10消費者対Kafka8消費者
- 4. 消費者がMQメーカのライブラリを追加することなくSOAP/JMSサービスを作成できますか?
- 5. 同じ話題から消費する複数の消費者
- 6. Erlangのメールボックスからメッセージを消費する動作を追跡する
- 7. なぜカフカの消費者は、消費に時間がかかりますか?
- 8. マルチプロセスウサギ消費者
- 9. 消費者は
- 10. が消費者
- 11. Kafka 10の消費者からコミットしないメッセージを消費する
- 12. kafkaのプロデューサーから消費者にオブジェクトを消費する方法は?
- 13. Angular.jsコントローラのUpload.progressイベントを消費する
- 14. GPUに追加/消費バッファのカウンタを設定しますか?
- 15. ウサギの消費者は、channel.basic.ackの後に消費を停止します
- 16. マウス入力イベントの追跡
- 17. すべての消費者にRabbitMQ broatcastイベント
- 18. ラムダ消費のためのs3イベント通知に追加値を渡す
- 19. カフカ消費者シェルスクリプト
- 20. カフカ - 消費者。 commitAsync
- 21. テストmasstransit消費者
- 22. カフカ消費者ポーリングタイムアウト
- 23. カフカ10.2新しい消費者対古い消費者
- 24. 消費者グループのすべての消費者はメッセージを受け取ります
- 25. プロデューサコアデータの消費者問題
- 26. 消費者のJavaスプリングウサギスレッドプール
- 27. 雲のカフカ消費者
- 28. レールアクションケーブル特定の消費者
- 29. Javaの消費者/プロデューサー
- 30. カフカ消費者がポーリングすることができません
'onSubscribe(disposable)'メソッドは、Observable開始emitイベントと呼ばれると一度呼び出されます。 'onComplete()'/'onError'が呼び出されたとき、それは終了したことを意味します。 –
ありがとうございますが、私はRxJavaHooksのような汎用ソリューションを探しています。だから私はそれぞれの観測にアクセスする必要はありません。それは、現在のjvmインスタンスで観測可能なスパイのように動作するはずです。私は参照してください – smalafeev
。 RxJavaPluginで 'setOnObservableSubscribe'を使うことができます。 –