2017-04-04 9 views
1

私は特定の状況のた​​めにRx戦略を思いつくために苦労してきました。私は誰かが私を正しい方向に向けることを望んでいる。RxJS 5:最新の値を保持しながらソケットフィードをスキップ

基本的には、ブール値に基づいてスキップしたいソケットフィードがあります。ストリームがソケットをスキップしているときは、ソケットから送信された最新の値の実行バッファを保持する必要があります。

私はソケットイベントをスキップもはや午前たら、それはだけで、他の条件(ブール値)の下で、スキップ、再スタートイベント

をソケットに耳を傾けるされたときに放出された最後の値をストリームを押し下げていません

だから基本的には:場合、 ソケットをスキップして、最後の値を適用し、再び待機を開始すると

  • はフィード
  • takeWhile(bool)
  • をソケットに耳を傾けます

遠く得るが、これは私が持っているものではありませんでした:おそらく

Rx.Observable.interval(1000) 
.skipWhile(()=>isSkipping) 
.bufferWhileSkipping?? 
.applySkippedValuesAfterSkipping(ifisReapply)?? 
.subscribe(val=>console.log(val)); 

skipWhileは正しいアプローチではなく、一種の感覚を作っただけだった...

答えて

1

をあなたは(私はあなたのisSkippingが観測可能と仮定している)以下と同様にそれを行うことができます:

const isSkipping = new BehaviorSubject(false); 

Observable.interval(100) 
    .take(20) 
    .window(isSkipping) 
    .withLatestFrom(isSkipping) 
    .switchMap(([observable, skipping]) => skipping 
    ? observable.takeLast(1).map(val => 'last:' + val) 
    : observable) 
    .subscribe(console.log); 

setTimeout(() => isSkipping.next(true), 500); 
setTimeout(() => isSkipping.next(false), 1050); 
setTimeout(() => isSkipping.next(true), 1500); 
setTimeout(() => isSkipping.next(false), 1850); 

エヴァーy時間isSkippingwindowの値を出力します。演算子は、すべてを再発行するか、スキップするとtrueに設定されたときに.takeLast(1)演算子をチェーンする新しいObservableを作成します。

上記の例では、コンソールに次の出力を印刷する:

0 
1 
2 
3 
last:9 
10 
11 
12 
13 
last:16 
17 
18 
19 
関連する問題