2017-06-13 9 views
2

私は、Reactive Programming with RxJavaのバージョン2ではなく、バージョン1を対象とした例を使って作業しています。無限ストリームの紹介には、次の例があります。並行性):RxJavaはisUnsubscribedに相当します

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { 
    Runnabler =() -> { 
     BigInteger i = ZERO; 
     while (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(i); 
      i = i.add(ONE); 
     } 
    }; 
    new Thread(r).start(); 
}); 

... 

Subscription subscription = naturalNumbers.subscribe(x -> log(x)); 
/* after some time... */ 
subscription.unsubscribe(); 

しかしながら、RxJava 2に、create()メソッドに渡されるラムダ式は、型ObservableEmitterであり、これはisUnsubscribed()方法を持ちません。私はWhat's Different in 2.0を見てきましたが、リポジトリの検索も実行しましたが、そのような方法は見つかりませんでした。

2.0と同じ機能をどのように実現できますか?

編集(kotlin使用n.b.)下記のような溶液を含むようには:あなたが観察可能に加入した後

val naturalNumbers = Observable.create<BigInteger> { emitter -> 
    Thread({ 
     var int: BigInteger = BigInteger.ZERO 
     while (!emitter.isDisposed) { 
      emitter.onNext(int) 
      int = int.add(BigInteger.ONE) 
     } 
    }).start() 
} 

val first = naturalNumbers.subscribe { log("First: $it") } 
val second = naturalNumbers.subscribe { log("Second: $it") } 

Thread.sleep(5) 
first.dispose() 
Thread.sleep(5) 
second.dispose() 

答えて

2

Disposableが返されます。ローカル変数に保存してdisposable.isDisposed()をチェックして、まだ購読中であるかどうかを確認することができます。

+1

偉大な、そのトリックでした。ありがとう。 – amb85

関連する問題