2017-05-18 21 views
4

RxJSには、観測可能な信号が発生するたびに項目をバッファリングして1つずつ出力する演算子がありますか?バッファのように並べ替えますが、バッファ全体を各信号にダンプするのではなく、信号ごとに特定の数をダンプします。それは観測可能な信号によって放出される数をダンプすることさえできます。RxJSのキュー演算子

Input observable: >--a--b--c--d--| 
Signal observable: >------1---1-1-| 
Count in buffer: !--1--21-2-121-| 
Output observable: >------a---b-c-| 

答えて

6

はい、あなたがやりたいzipを使用することができます実際には

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]); 
 
const signal = new Rx.Subject(); 
 
const output = Rx.Observable.zip(input, signal, (i, s) => i); 
 
output.subscribe(value => console.log(value)); 
 
signal.next(1); 
 
signal.next(1); 
 
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

は、zipはバッファリングに関係this GitHub issueの例として使用されています。

あなたがバッファリングされた値がリリースされる方法の多くを決定するために、信号の放出された値を使用したい場合、あなたはこのような何か行うことができます:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]); 
 
const signal = new Rx.Subject(); 
 
const output = Rx.Observable.zip(
 
    input, 
 
    signal.concatMap(count => Rx.Observable.range(0, count)), 
 
    (i, s) => i 
 
); 
 
output.subscribe(value => console.log(value)); 
 
signal.next(1); 
 
signal.next(2); 
 
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

0

windowを分離するために使用することができますタイムライン。出力を保持するにはtakeLastが使用されます。

let signal = Observable.interval(1000).take(4); 

let input = Observable.interval(300).take(10).share(); 

let output = input 
    .do(value => console.log(`input = ${value}`)) 
    .window(signal) 
    .do(() => console.log(`*** signal : end OLD and start NEW subObservable`)) 
    .mergeMap(subObservable => { 
     return subObservable.takeLast(100); 
    }) 
    .share() 

output.subscribe(value => console.log(` output = ${value}`)); 

Observable.merge(input.mapTo(1), output.mapTo(-1)) 
    .scan((count, diff) => { 
     return count + diff; 
    }, 0) 
    .subscribe(count => console.log(`   count = ${count}`)); 

結果:

22:28:37.971 *** signal : end OLD and start NEW subObservable 
22:28:38.289 input = 0 
22:28:38.292    count = 1 
22:28:38.575 input = 1 
22:28:38.576    count = 2 
22:28:38.914 input = 2 
22:28:38.915    count = 3 
      <signal received> 
22:28:38.977  output = 0 
22:28:38.979    count = 2 
22:28:38.980  output = 1 
22:28:38.982    count = 1 
22:28:38.984  output = 2 
22:28:38.986    count = 0 
22:28:38.988 *** signal : end OLD and start NEW subObservable 
22:28:39.175 input = 3 
22:28:39.176    count = 1 
22:28:39.475 input = 4 
22:28:39.478    count = 2 
22:28:39.779 input = 5 
22:28:39.780    count = 3 
      <signal received> 
22:28:39.984  output = 3 
22:28:39.985    count = 2 
22:28:39.986  output = 4 
22:28:39.988    count = 1 
22:28:39.989  output = 5 
22:28:39.990    count = 0 
22:28:39.992 *** signal : end OLD and start NEW subObservable 
22:28:40.075 input = 6 
22:28:40.077    count = 1 
22:28:40.377 input = 7 
22:28:40.378    count = 2 
22:28:40.678 input = 8 
22:28:40.680    count = 3 
22:28:40.987 input = 9 
22:28:40.990    count = 4 
      <input completed> 
22:28:40.992  output = 6 
22:28:40.993    count = 3 
22:28:40.995  output = 7 
22:28:40.996    count = 2 
22:28:40.998  output = 8 
22:28:40.999    count = 1 
22:28:41.006  output = 9 
22:28:41.007    count = 0 
関連する問題