2017-04-15 10 views
0

"Disconnecting"セクションのサンプルコードを複製しようとしていますhere。我々は、接続の署名で見たようにRxJava - ConnectableObservable、切断と再接続

を切断

、この方法はObservable.subscribeがするよう、購読を返します。この参照を使用してConnectableObservableのサブスクリプションを終了することができます。イベントがオブザーバーに伝播するのを止めますが、ConnectableObservableからイベントを解約しません。 再度connectを呼び出すと、ConnectableObservableは新しいサブスクリプションを開始し、古いオブザーバは値の再受信を開始します。 RxJava 2.0.8を使用して

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
Subscription s = connectable.connect(); 

connectable.subscribe(i -> System.out.println(i)); 

Thread.sleep(1000); 
System.out.println("Closing connection"); 
s.unsubscribe(); 

Thread.sleep(1000); 
System.out.println("Reconnecting"); 
s = connectable.connect(); 

出力

0 
1 
2 
3 
4 
Closing connection 
Reconnecting 
0 
1 
2 
... 

、私が持っている:

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
    Disposable s = connectable.connect(); 

    connectable.subscribe(new Observer<Long>() { 
     @Override 
     public void onSubscribe(Disposable d) { 

     } 

     @Override 
     public void onNext(Long aLong) { 
      Log.d("test", "Num: " + aLong); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Closing connection"); 
    s.dispose(); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Reconnecting..."); 
    connectable.connect(); 

出力

Num: 0 
Num: 1 
Num: 2 
Num: 3 
Num: 4 
Closing connection 
Reconnecting... 

ありがとうございます。

+0

私はあなたの問題を理解していません – Cochi

+0

ココは自分のコードで、接続可能なソースを切断してから再度接続した後にサブスクライバが値を受け取らない。 – veritas1

答えて

4

RxJavaではこの動作が採用されていないようです。実際の例はRx.NETのものです。 https://github.com/ReactiveX/RxJava/issues/4771

+0

おそらくはい。私が動作させることができるのは、 'connect()'をもう一度呼び出す前にもう一度 'subscribe()'したときだけです。これは一度 'dispose()'と呼ばれるようになっていますか? –

関連する問題