2017-05-29 11 views
1

加入者がイベントを消費し始めたときと終了したときをトレースしたいと思います。 Observable/Subscribersすべてに共通の方法はありますか?RxJava加入者のイベントを消費するのを追跡できますか?

+0

'onSubscribe(disposable)'メソッドは、Observable開始emitイベントと呼ばれると一度呼び出されます。 'onComplete()'/'onError'が呼び出されたとき、それは終了したことを意味します。 –

+0

ありがとうございますが、私はRxJavaHooksのような汎用ソリューションを探しています。だから私はそれぞれの観測にアクセスする必要はありません。それは、現在のjvmインスタンスで観測可能なスパイのように動作するはずです。私は参照してください – smalafeev

+0

。 RxJavaPluginで 'setOnObservableSubscribe'を使うことができます。 –

答えて

6

1.xまたは2.x用ですか? 2.xでは、内部プロトコルがすべて誤ってフローを最適化しないとみなされなければならないので、かなり複雑になる可能性があります。

import io.reactivex.Observer; 

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> { 
    if (!observable.getClass().getName().toLowerCase().contains("map")) { 
     return observer; 
    } 

    System.out.println("Started"); 

    class SignalTracker implements Observer<Object>, Disposable { 
     Disposable upstream; 
     @Override public void onSubscribe(Disposable d) { 
      upstream = d; 
      // write the code here that has to react to establishing the subscription 
      observer.onSubscribe(this); 
     } 
     @Override public void onNext(Object o) { 
      // handle onNext before or aftern notifying the downstream 
      observer.onNext(o); 
     } 
     @Override public void onError(Throwable t) { 
      // handle onError 
      observer.onError(t); 
     } 
     @Override public void onComplete() { 
      // handle onComplete 
      System.out.println("Completed"); 
      observer.onComplete(); 
     } 
     @Override public void dispose() { 
      // handle dispose 
      upstream.dispose(); 
     } 
     @Override public boolean isDisposed() { 
      return upstream.isDisposed(); 
     } 
    } 
    return new SignalTracker(); 
    }); 

    Observable<Integer> observable = Observable.range(1, 5) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.computation()) 
     .map(integer -> { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return integer * 3; 
     }); 

    observable.subscribe(System.out::println); 

    Thread.sleep(6000L); 

プリント:

Started 
3 
6 
9 
12 
15 
Completed 

編集: RxJava 1バージョン

そうでなければ、それは本当のObserverとオペレータとの間でシムれるObserverを書くのと同じくらい簡単ですもう少しラムダが必要ですが、実行可能:

RxJavaHooks.setOnObservableStart((observable, onSubscribe) -> { 
    if (!onSubscribe.getClass().getName().toLowerCase().contains("map")) { 
     return onSubscribe; 
    } 

    System.out.println("Started"); 

    return (Observable.OnSubscribe<Object>)observer -> { 
     class SignalTracker extends Subscriber<Object> { 
      @Override public void onNext(Object o) { 
       // handle onNext before or aftern notifying the downstream 
       observer.onNext(o); 
      } 
      @Override public void onError(Throwable t) { 
       // handle onError 
       observer.onError(t); 
      } 
      @Override public void onCompleted() { 
       // handle onComplete 
       System.out.println("Completed"); 
       observer.onCompleted(); 
      } 
      @Override public void setProducer(Producer p) { 
       observer.setProducer(p); 
      } 
     } 
     SignalTracker t = new SignalTracker() 
     observer.add(t); 
     onSubscribe.call(t); 
    }; 
    }); 

    Observable<Integer> observable = Observable.range(1, 5) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.computation()) 
     .map(integer -> { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return integer * 3; 
     }); 

    observable.subscribe(System.out::println); 

    Thread.sleep(6000L); 
+0

説明に例を追加しました。この解決法はそれに対しては機能しません。 – smalafeev

+0

「うまくいかない」とはどういう意味ですか? – akarnokd

+0

加入者がすべてのイベントの処理を完了する前に、「完了」が印刷されます。 – smalafeev

関連する問題