2017-01-08 4 views
0

私はtakeUntil()の非常に奇妙な動作に陥りました。およそ500msのその作成後Rxjs:takeUntil(timer)で観測可能です。タイマーが遅くなりました。

let finish = Observable.timer(3000); 

は、その後、私はいくつかの時間を待って、私はタイマー「ダニ」の後に発光停止することが観察できると期待される

// 2500 ms later 
someObservable.takeUntil(finish); 

を呼び出し、すなわち:私は、観察可能なタイマーを作成します。実際には、は、作成後に3000msの間、タイマーが「刻む」瞬間を超えて、を放り続けます。これは、絶対時間値を含むDateオブジェクトを使用してタイマーを作成した場合には発生しません。

これは仕様です。はいの場合、説明は何ですか?

ここでのNode.jsで実行可能な完全なコード、(それがnpm install rxが必要です)です:私は絶対時間を使用してタイマーを作成する場合

2539 'seq' 0 
3001 'FINISH' 0 
3005 'FINISH' 'DONE' 
3007 'seq' 1 
3506 'seq' 2 
4006 'seq' 3 
4505 'seq' 4 
5005 'seq' 5 
5506 'seq' 6 
5507 'seq' 'DONE' 

let {Observable, Subject} = require("rx") 
let start = new Date().getTime(); 
function timeMs() { return new Date().getTime() - start }; 

function log(name, value) { 
    console.log(timeMs(), name, value); 
} 

Observable.prototype.log = function(name) { 
    this.subscribe(v=>log(name,v), 
        err=>log(name, "ERROR "+err.message), 
        ()=>log(name, "DONE")); 
    return this; 
} 

let finish = Observable.timer(3000).log("FINISH"); 
setTimeout(()=>Observable.timer(0,500).takeUntil(finish).log("seq"), 2500); 

これは次の出力を生成し

let finish = Observable.timer(new Date(Date.now()+3000)).log("FINISH"); 

次に、期待通りに動作します。

2533 'seq' 0 
3000 'seq' 'DONE' 
3005 'FINISH' 0 
3005 'FINISH' 'DONE' 

この動作は、さまざまな状況でむしろ一貫しているようです。間隔をとり、mergeMap()またはswitchMap()を使用して子シーケンスを作成すると、結果は似ています。子シーケンスは終了イベントを超えて発光し続けます。

思考?

答えて

3

寒さの最初のルールを忘れていますObservables:各サブスクリプションは新しいストリームです。

logオペレータにはバグがあります。 オペレータに渡すと暗黙のうちに、Observableに一度加入して(したがって最初のサブスクリプションを作成して)、元のObservableを返信し、に再度をサブスクライブします。実際には実際には2つのストリームが正しく動作していますが、どちらも正しく動作しています。

基本的には、サブスクリプションが発生するまでの時間であるではなく、特定の時刻に各ストリームを放出するように設定しているため、絶対的な場合に機能します。

あなたはそれが動作確認したい場合は、私はあなたがあなたの実装を変更を示唆している:完全を期すため

let start = new Date().getTime(); 
function timeMs() { return new Date().getTime() - start }; 

function log(name, value) { 
    console.log(timeMs(), name, value); 
} 

Observable.prototype.log = function(name) { 
    // Use do instead of subscribe since this continues the chain 
    // without directly subscribing. 
    return this.do(
     v=>log(name,v), 
     err=>log(name, "ERROR "+err.message), 
    ()=>log(name, "DONE") 
    ); 
} 

let finish = Observable.timer(3000).log("FINISH"); 

setTimeout(()=> 
    Observable.timer(0,500) 
    .takeUntil(finish) 
    .log("seq") 
    .subscribe(), 
2500); 
+0

です:)あなたは、timer()が購読された後にのみ時間をカウントし始め、新しい購読ごとにカウントが新たに開始されると言っていますか?率直に言って、タイマー()メソッドのドキュメントには、おそらく "async IScheduler"という言葉を除いては何も言及されていませんが、おそらく.NETの実装からコピーされてJavaScriptの文脈では私にはほとんど意味がありません。 –

+0

心配する必要はなく、よくある間違いです。これは 'timer'演算子に固有のものではなく、Observablesに固有のより一般的な振る舞いです。彼らは怠惰に評価されることを意図しています。 [hot vs. cold](https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339)のこのかなり良いプライマーを見てください。 – paulpdaniels

0

、ここでは実際に私が望んでいたコードです。 Observable.publish().connect()を使用すると、すぐにティッキングを開始する「ホット」タイマーが作成され、すべてのサブスクライバに同じ時間が保持されます。また、@ paulpdanielsが示唆するように、 "log"メソッドで不要なサブスクリプションを避けることもできます。

警告:競合状態に注意してください。タイマーがチェックされた後に子シーケンスが開始されると、それは決して停止しません。私は最初の場所で寒観測のルールを知らなかった:2500年から3500

let {Observable, Subject, Scheduler, Observer} = require("rx") 
let start = new Date().getTime(); 
function timeMs() { return new Date().getTime() - start }; 

function log(name, value) { 
    console.log(timeMs(), name, value); 
} 

var logObserver = function(name) { 
    return Observer.create( 
     v=>log(name,v), 
     err=>log(name, "ERROR "+err.message), 
    ()=>log(name, "DONE")); 
} 

Observable.prototype.log = function(name) { return this.do(logObserver(name)); } 

Observable.prototype.start = function() { 
    var hot = this.publish(); hot.connect(); 
    return hot; 
} 

let finish = Observable.timer(3000).log("FINISH").start(); 

setTimeout(()=> 
    Observable.timer(0,500) 
    .takeUntil(finish) 
    .log("seq") 
    .subscribe(), 
2500); 

に、最後の行の変更タイムアウトを実証するために、出力は、あなたが私にあまりにも多くのクレジットを与えている

2549 'seq' 0 
3002 'FINISH' 0 
3006 'seq' 'DONE' 
3011 'FINISH' 'DONE' 
関連する問題