2016-02-01 19 views
19

RxJs 5 share()オペレータの動作は100%わかりません。latest docsを参照してください。質問hereのJsbin。RxJs 5のshare()演算子はどのように機能しますか?

Iは0〜2の一連の、二番目で区切られた各値と観測可能なを作成する場合:

var source = Rx.Observable.interval(1000) 
.take(5) 
.do(function (x) { 
    console.log('some side effect'); 
}); 

そして私は、この観測可能に2人の加入者を作成した場合:

source.subscribe((n) => console.log("subscriptor 1 = " + n)); 
source.subscribe((n) => console.log("subscriptor 2 = " + n)); 

私が手これはコンソールで:

"some side effect ..." 
"subscriptor 1 = 0" 
"some side effect ..." 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"some side effect ..." 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"some side effect ..." 
"subscriptor 2 = 2" 

私は各サブスクリプションが同じObservableを購読すると思っていましたが、それは事実ではないようです!その購読の行為のように完全に別のオブザーバブルを作成します!

しかしshare()オペレータが観察可能なソースに追加された場合:

var source = Rx.Observable.interval(1000) 
.take(3) 
.do(function (x) { 
    console.log('some side effect ...'); 
}) 
.share(); 

その後、我々はこれを取得:

"some side effect ..." 
"subscriptor 1 = 0" 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"subscriptor 2 = 2" 

私はshare()せずに期待するものはどれ。

ここで何が起こっているのですか?share()オペレータはどのように機能しますか?各サブスクリプションは新しいObservableチェーンを作成しますか?

答えて

15

ドキュメントリンクがRxJS v4のように見えている間は、RxJS v5を使用していることに注意してください。具体的なことは覚えていませんが、特にshareのオペレータはいくつかの変更を行ったと思いますが、特に完了と再登録の際には言い訳をしません。

あなたのスタディに示されているように、あなたの期待はライブラリデザインに対応していません。オブザーバブルは、データフローを遅延的にインスタンス化し、具体的には、サブスクライバがサブスクライブするときにデータフローを開始します。 2番目のサブスクライバが同じオブザーバブルにサブスクライブすると、最初のサブスクライバであるかのように、別の新しいデータフローが開始されます(したがって、各サブスクリプションは新しいオブザーバブルチェーンを作成します)。これは、RxJSの用語ではコールドオブザーバブルと呼ばれるもので、RxJSのデフォルト動作です。データが到着した瞬間に加入者にデータを送信するオブザーバブルが必要な場合、これは熱い観測可能なものであり、ホット観測可能な方法の1つはshareオペレータを使用することです。

Hot and Cold observables : are there 'hot' and 'cold' operators?(RxJS v4では有効ですが、そのほとんどはv5で有効です)という図式のサブスクリプションとデータフローがあります。

  1. 加入者> 0
  2. と、観察の数は

Scenario1完了していない:加入者数をこれらの2つの条件が満たされた場合

10

シェアは、観察可能な "ホット" を作ります> 0であり、新しいサブスクリプションの前に観察可能なものが完了していない

var shared = rx.Observable.interval(5000).take(2).share(); 
var startTime = Date.now(); 
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 3000); 

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds 
// another emission for both observers at: startTime + 10 seconds 

シナリオ2:新しいサブスクリプションの前にサブスクライバの数がゼロです。 「冷たい」となっ

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer1.unsubscribe(); 
}, 1000); 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time 
}, 3000); 
// observer2's onNext is called at startTime + 8 seconds 
// observer2's onNext is called at startTime + 13 seconds 

シナリオ3:新しいサブスクリプションの前に完成した観測可能。 「寒い」

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
     console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
    }; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 12000); 

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs 
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs 
関連する問題