2016-03-28 7 views
1

私はストリームが少しの運をもって最新の値だけを再生しようとしています。基本的に、私はreplaySubject(1)を手に入れることはできません。.onNextです。私がやりたいことは、それを取って別のものにマップし、その後に初期値を追加することです。私の最初の本能は、以下ましたReplaySubjectのような観察可能な行為を作る

stream: test -1 
stream: test 0 

を印刷し

var testInStream = new Rx.ReplaySubject(1); 
var testOutStream = testInStream.startWith(-1); 

testInStream.onNext(0); 

setTimeout(function(){ 
     testOutStream.subscribe(function(x){ 
     console.log("stream:", x); 
    }); 
}, 1000); 

しかし、私は唯一のstream: test 0を印刷する方法を見つけるしたいと思います。私が思い付くことができます最も近いものは、私が欲しいものである

stream: test 0 

を返し

var testInStream = new Rx.ReplaySubject(1); 
var testOutStream = testInStream.startWith(-1); 

testInStream.onNext(0); 

var wrapperSubject = new Rx.ReplaySubject(1); 
testOutStream.subscribe(function(new_val){ wrapperSubject.onNext(new_val) }); 

setTimeout(function(){ 
    wrapperSubject.subscribe(function(x){ 
    console.log("stream:", x); 
    }); 
}, 1000); 

です。しかし、私はむしろ中間のreplaySubjectを作成しないで、testOutStream.latest()testOutStream.replay(1)のようなものを呼び出すことを好みますが、まだそれらのいずれかを動作させることはできません。

私の例ではJS Binのヘルプがあります。

答えて

0

あなたはshareReplay(1)を使用することができ、サンプルコードがhere

var validation$ = Rx.Observable 
.fromEvent(document.getElementById('validation'), 'click') 
.map(function(_, index){ 
    return ++counter; 
    }) 
.do(emits(ta_validation, 'index')) 
.shareReplay(1) 

validation$.subscribe(function(){}) 

setTimeout(function() { 
    validation$ 
    .do(emits(ta_result, 'second subscription')) 
    .subscribe(function(){ });}, 3000) 

である私は単純にreplay(1)connectと試みたが、妙ストリーム全体の値が再生されます。だから、shareReplayを使うのが一番です。

また次の演算子は、あなたがその「最新の」値でやりたい正確に何に応じて、使用のものとすることができることに注意して取る:

+0

RxJs 5ではshareReplayが削除されているようですhttps://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md –

+0

はい、OPは '.onNext'を使って件名を使用していますので、彼はRxjs v4またはそれ以下を使用しています。実際にはjsbinから、OPは4.0.6を使用します – user3743222

0

ありよりクリーンなソリューション。ほとんど知られていない秘密は、オブザーバ(サブジェクトなど)をサブスクリプションハンドラとして渡すことができるということです。だから、仕事を行うコードは次のとおりです。

var testInStream = new Rx.ReplaySubject(1); 
// var testOutStream = testInStream.startWith(-1); 

var testOutStream = new Rx.ReplaySubject(1); 
testInStream.startWith(-1).subscribe(testOutStream); 

testInStream.onNext(0); 

setTimeout(function(){ 
     testOutStream.subscribe(function(x){ 
     console.log("stream:", x); 
    }); 
}, 1000); 

this jsbinを参照してください。