2
私は、Reactive Programming with RxJavaのバージョン2ではなく、バージョン1を対象とした例を使って作業しています。無限ストリームの紹介には、次の例があります。並行性):RxJavaはisUnsubscribedに相当します
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler =() -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
しかしながら、RxJava 2に、create()
メソッドに渡されるラムダ式は、型ObservableEmitter
であり、これはisUnsubscribed()
方法を持ちません。私はWhat's Different in 2.0を見てきましたが、リポジトリの検索も実行しましたが、そのような方法は見つかりませんでした。
2.0と同じ機能をどのように実現できますか?
編集(kotlin使用n.b.)下記のような溶液を含むようには:あなたが観察可能に加入した後
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
偉大な、そのトリックでした。ありがとう。 – amb85