2017-12-03 27 views
0

こんにちは。私はRxJavaで何らかのMVCを作ろうとしています。したがって、考えられるのは であり、常に一定のサブスクリプションを作成し、それは常に観察される にサブスクライブされます。また、この観察可能な部分は、ネットワークコールなど、いつでも再実行することができます。私は、この機能をテストするには、このコードを試してみました:接続可能なobservableでのRxJava再接続が機能しない

class Main { 
    companion object { 
     @JvmStatic 
     fun main(args: Array<String>) { 
      val obs = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io()) 
       .publish() 

      val s1 = obs 
       .doOnUnsubscribe { System.out.println("s1 unsubscribed") } 
       .subscribe { System.out.println("first: $it") } 

      val s = obs.connect() 

      Thread.sleep(4000) 

      System.out.println("unsubscribe") 
      s.unsubscribe() 

      Thread.sleep(1000) 

      System.out.println("connect") 
      val obsS2 = obs.connect() 

      System.out.println("isUnsubscribed: ${s1.isUnsubscribed}") 

      Thread.sleep(10000) 
     } 
    } 
} 

これは私が期待したものである:

first: 0 
first: 1 
first: 2 
unsubscribe 
connect 
isUnsubscribed: false 
first: 0 
first: 1 
... 
Process finished with exit code 0 

は、これが実際の結果である:

first: 0 
first: 1 
first: 2 
unsubscribe 
connect 
isUnsubscribed: false 

Process finished with exit code 0 

私はいくつかの記事(RxJava - ConnectableObservable, disconnecting and reconnectinghttps://github.com/Froussios/Intro-To-RxJava/issues/18を見つけました)、これはバグだと人々は言うが、このバグはかなり長い間存在している。

質問:これは本当にバグですか?そうでない場合、どうすればそのような行動を取ることができますか?

編集:バージョン1.3.4、1.2.10、1.1.10でテスト は、1.0.10

+0

はあなたにもRx2のを試してみましたか? – tynn

答えて

0

それはバグではなくRxJavaのConnectableObservableのプロパティではありません:あなたunsubscribeかの接続、以前に加入し、消費者再接続しても、それ以上のイベントは受信されません。

あなたは、実際のソースに加入すると退会、PublishSubjectを使用してRx.NETの動作と同様の効果を達成することができます

PublishSubject subject = PublishSubject.create(); 

subject.subscribe(System.out::println); 

Subscription s = source.subscribe(subject::onNext); 

s.unsubscribe(); 

Subscription s2 = source.subscribe(subject::onNext); 
+0

ありがとう!私はすでにこのパターンを使わないように私のアーキテクチャを変更しました)) –