2016-11-14 13 views
2

RXJavaシェア()Observable.createで動作していない()

public class ConnectObs { 
public static void main(String[] args) { 

    Observable<Integer> intsObservable = Observable.just(1, 2); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("A " + s)); 
    intsObservable.subscribe(s->System.out.println("B " + s)); 

    intsObservable = Observable.create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
    }); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("C " + s)); 
    intsObservable.subscribe(s->System.out.println("D " + s)); 
    } 
} 

はではなく、D用のA、B及びCの結果を生成するこのコード - それはなぜですか?以下

結果:

A 1 
A 2 
B 1 
B 2 
C 1 
C 2 

答えて

2

Observable.justと(道で安全に構築されていません)、カスタム観察可能との重要な違いは、Cサブスクリプションがまだときにアクティブであるので、あなたは、ストリームを完了していないということですDサブスクリプションが発生するので、Dは、それ以上来ない排出量を待つだけです。

あなたの作成は次のようになります。加入者に少し友好であるために

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
     s.onCompleted(); 
}) 
//prevent MissingBackpressureException 
.onBackpressureBuffer(); 

あなたもunsubscribeチェックを追加できます:

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     if (!s.isUnsubscribed()) 
      s.onNext(2); 
     if (!s.isUnsubscribed()) 
      s.onCompleted(); 
}).onBackpressureBuffer(); 
関連する問題