2017-12-24 15 views
0

イベントを集約するobservableを作成しようとしていますの範囲内ですが、集計をリセットすることはできません。RxJs - 1秒ごとに集計イベントとリセットスキャン

私は集計アキュムレータにタイムスタンプを持っているので、私はウィンドウ間で集計を保持していることがわかります。

{ high: '14725.97000000', 
    low: '14106.01000000', 
    last: '14150.52000000', 
    ts: '1514089269250' } 
{ high: '17279.27000000', 
    low: '14059.87000000', 
    last: '14162.09000000', 
    ts: '1514089269250' } 

スキャンをリセットするにはどうすればよいですか?他の方法で同じことを達成するか?

答えて

0

mergeMapとの最初mergeAllscan置き換え - scanが行われ、その中に:

const btc = events 
    .filter(f => f.product_id === "BTC-USD" && f.type === "done") 
    .window(Rx.Observable.interval(1000)) 
    .mergeMap(w => w.scan((acc, i) => { 
     let price = i.price; 
     if (i.price) { 
      if (acc.high === -1 || price > acc.high) acc.high = price; 
      if (acc.low === -1 || price < acc.low) acc.low = price; 
      acc.last = price; 
     } 
     return acc; 
     }, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() }) 
    ) 
    .window(Rx.Observable.interval(1000)) 
    .map(j=> j.last()) 
    .mergeAll(); 

また、あなたがなどreduce代わりのscan、二windowを、使用している場合ではありませんreduceは最後の値のみを出力します:

const btc = events 
    .filter(f => f.product_id === "BTC-USD" && f.type === "done") 
    .window(Rx.Observable.interval(1000)) 
    .mergeMap(w => w.reduce((acc, i) => { 
     let price = i.price; 
     if (i.price) { 
      if (acc.high === -1 || price > acc.high) acc.high = price; 
      if (acc.low === -1 || price < acc.low) acc.low = price; 
      acc.last = price; 
     } 
     return acc; 
     }, { high: -1, low: -1, last: -1, ts: (new Date()).getTime().toString() }) 
    ); 
0

実際に私はあなたが正しくあなたの仕事を理解しているなら、ここでスキャンする必要はないと思う。

var openings = Rx.Observable.interval(3000); 

// Convert the window to an array 
var source = Rx.Observable.timer(0, 100) 
    .window(openings) 
    .take(3) // restrict values just for example 
    .flatMap(function (x) { 
    let arrayedValues = x.toArray(); 
    //Here you can caluculate high, low, last, ts 
    return arrayedValues; 
    }); 

source.subscribe(
(x) => console.log(x) 
FYI

plunker

+0

、 'toArray' [実装さ(https://github.com/ReactiveX/rxjs/blob/5.5.6/src/operators/toArray.ts:この例を試します#L9-L11)を使用しています。これは私の答えで使用することを提案したものです。いつものように、RxJSで何かをするには複数の方法があります:) – cartant

関連する問題