私は冷たくしたい、つまり、最初の観察者がそれを購読したときにアイテムを放出し始めるだけです。Observable
です。すべてのオブザーバーがObservableから登録を解除したときにイベントがありますか?
次に、すべてのオブザーバーが同じ観測可能から退会したときに、ソースからすべてのリソースを確実に解放したいと思います。それは可能ですか?
私は冷たくしたい、つまり、最初の観察者がそれを購読したときにアイテムを放出し始めるだけです。Observable
です。すべてのオブザーバーがObservableから登録を解除したときにイベントがありますか?
次に、すべてのオブザーバーが同じ観測可能から退会したときに、ソースからすべてのリソースを確実に解放したいと思います。それは可能ですか?
あなたはあなたのためにこれを処理するためにConnectedObservableの機能を使用することができます。
//Replace Observable.range(1,1000) with your Observable implementation
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share();
share
方法は、方法publish
とrefCount
を呼び出します。
publish
は、通常のObservableをConnectedObservableに変換します。これは、connect
と呼んですぐにアイテムを放出します。したがって、技術的には多くのオブザーバーを希望どおりに購読してから、connect
に電話すると、同時にすべてのアイテムの発光が開始されます。
refCount
はConnected Observableを伝統的なものに戻しますが、新しい特性を備えています。追加された利点は次のとおりです。この観察可能なのは寒い(サブスクライバがサブスクライブするときに発光し、内部ではpublish
で作成された元のConnectedObservableのconnect
メソッドを内部的に呼び出します)、元のConnectedObservableに接続されているサブスクライバの数を記録します。すべてのサブスクライバがサブスクライブしなくなると、ConnectedObservableソースから引用されなくなります。そのため、1つのサブスクリプションを処理するだけで済むため、ロジックははるかに簡単になります。
ここで共有操作のために良いの図があります:代わりhttp://reactivex.io/RxJava/javadoc/rx/Observable.html#share()
、これは十分な柔軟性が得られない場合、私はあなたが冷たい、観察を作成するためにdefer
を使用して簡単にこの動作を実現することができるはずだと思います、およびdoOnSubscribe
およびdoOnUnsubscribe
の方法が挙げられる。
例:
Observable.defer(() -> {
final AtomicInteger counter = new AtomicInteger();
return Observable.range(1, 1000)
.doOnSubscribe(() -> counter.incrementAndGet())
.doOnUnsubscribe(() -> {
if (counter.decrementAndGet() == 0) {
freeResources();
}
});
});
この観察可能な数のシーケンス(自分の観測可能な実装でこれを置き換える)とすぐに第1の加入者が加入を放出が開始されます、それは各サブスクリプションでカウンタを増加させ、そして解放されますすべての購読者が購読を停止するとリソースを使います(必要なものはfreeResourcesに置き換えてください)。