2017-02-12 12 views
8

publishReplay().refCount()の仕組みを理解できません。例えばrxjs 5 publishReplay refCount

https://jsfiddle.net/7o3a45L1/):

コールobserverA:5

observerB:5コールobserverB:5

observerC:5

var source = Rx.Observable.create(observer => { 
    console.log("call"); 
    // expensive http request 
    observer.next(5); 
}).publishReplay().refCount(); 

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
subscription1.unsubscribe(); 
console.log(""); 

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)}); 
subscription2.unsubscribe(); 
console.log(""); 

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)}); 
subscription3.unsubscribe(); 
console.log(""); 

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)}); 
subscription4.unsubscribe(); 

は、以下の結果を与えobserverC:5 call observerC:5

observerD:5 observerD:5 observerD:5コールobserverD:5

1)observerB、C及びDが複数回呼び出されているのはなぜ?

2)「通話」が各行に印刷され、行の先頭に印刷されないのはなぜですか?

さらに、私がpublishReplay(1).refCount()を呼び出すとobserverB、CおよびDがそれぞれ2回呼び出されます。

私が期待しているのは、新しい観察者はすべて値5を正確に1回受け取り、「呼び出し」は1回だけ表示されるということです。

答えて

13

publishReplay(x).refCount()は、次のことを行います。

  • それはX排出量まで再生ReplaySubjectを作成します。 xが定義されていない場合、ストリーム全体が再生されます。
  • これは、refCount()演算子を使用してReplaySubjectマルチキャストに対応します。これにより、同時のサブスクリプションに同じ排出が発生します。

あなたの例では、それがすべて一緒にどのように動作するか曇りいくつかの問題が含まれています。このスニペットを実行するとき、我々はそれが実際には、すべてのサブスクリプションのための新たな排出量を作成して、オブザーバーDについて重複する値を放出されていないことをはっきりと見ることができます

var state = 5 
 
var realSource = Rx.Observable.create(observer => { 
 
    console.log("creating expensive HTTP-based emission"); 
 
    observer.next(state++); 
 
// observer.complete(); 
 
    
 
    return() => { 
 
    console.log('unsubscribing from source') 
 
    } 
 
}); 
 

 

 
var source = Rx.Observable.of('') 
 
    .do(() => console.log('stream subscribed')) 
 
    .ignoreElements() 
 
    .concat(realSource) 
 
.do(null, null,() => console.log('stream completed')) 
 
.publishReplay() 
 
.refCount() 
 
; 
 
    
 
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); 
 
subscription1.unsubscribe(); 
 
    
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v)); 
 
subscription2.unsubscribe(); 
 
    
 
subscription3 = source.subscribe(v => console.log('observerC: ' + v)); 
 
subscription3.unsubscribe(); 
 
    
 
subscription4 = source.subscribe(v => console.log('observerD: ' + v)); 
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

次改訂スニペットを参照してください。 。どうして?

すべてのサブスクリプションは、次回のサブスクリプションが行われる前に購読解除されます。これにより実質的にrefCountが0に戻り、マルチキャスティングは行われません。

問題はrealSourceストリームが完了しないという事実にあります。我々はマルチキャストしていないので、次の加入者はReplaySubjectを通じてrealSourceという新鮮なインスタンスを取得し、新しい排出には既に排出されている排出が前に追加されます。

高価なHTTPリクエストを複数回呼び出すようにストリームを修正するには、ストリームを完了して、publishReplayが再購読する必要がないことが分かるようにする必要があります。

4

これは、publishReplay()を使用しているために発生します。通過するすべての値を格納するReplaySubjectのインスタンスを内部的に作成します。

Observable.createを使用しているため、source.subscribe(...)を呼び出すたびにReplaySubjectに1つの値を追加します。あなたはそれはあなたが購読し、それがそのソースに自分自身をサブスクライブするときに最初にそのバッファを発するReplaySubjectだから各行の先頭に印刷されたcallを取得していない

実装の詳細については、以下を参照してください。

publishReplay(1)を使用する場合も同様です。最初はReplaySubjectからバッファアイテムを放出した後、まだ一般observer.next(5);

+0

唯一の関連する回答 –

4

から別の項目:ストリームがあれば、少なくとも1人の加入者が存在するように共有/ホットであることrefCount手段、 - しかし、リセット/冷たいされています加入者がいない場合。

これは、何も何度も実行されないことを絶対に確かめたい場合は、refCount()を使用せず、単にストリームをconnectホットに設定する必要があります。

追加の注意:の後にobserver.complete()を追加すると、期待した結果も得られます。


サイドノート:ここで独自のカスタムObervableを作成する必要はありますか? 95%のケースでは、既存のオペレーターは所定の用途に十分である。組み合わせ

関連する問題