2016-10-27 26 views
1

rxjs5では、Throttlerクラスを実装しようとしています。キュークリアを使用したRxJSの無制限レート制限

import Rx from 'rxjs/rx'; 

export default class Throttler { 
    constructor(interval) { 
    this.timeouts = []; 
    this.incomingActions = new Rx.Subject(); 
    this.incomingActions 
     .concatMap(action => Rx.Observable.just(action).delay(interval/2)) 
     .subscribe(action => action()); 
    } 

    clear() { 
    // How do I do this? 
    } 

    do(action) { 
    this.incomingActions.next(action); 
    } 
} 

次不変量を保持しなければならない:doに渡されたすべてのアクションがアクションキュー

  • に追加さ

    • アクションキューが順序及び一定の間隔で処理されます決定されるようにコンストラクタパラメータで

    • を使用してアクションキューをクリアすることができます。

    私の現在の実装では、上記のように固定間隔が処理されますが、キューのクリア方法はわかりません。また、キューが空であっても、すべてのアクションがinterval/2ミリ秒遅れるという問題があります。

    P.S.私がインバリアントを記述する方法は、setIntervalと配列をキューにした実装に非常に簡単にマップできますが、Rxでこれをどうやって行うのでしょうか。

  • 答えて

    0

    これは、デフォルトのSubjectクラスのための良い場所ではないようです。あなた自身のサブクラスでそれを拡張することは、リストに挙げられた理由のために、より良いでしょう。

    しかし、あなたのケースで私はいくつかの指標で.do(action)方法に来る各アクションを識別し、キャンセルとしてマークされているもののインデックスのためのいくつかの配列を確認することで、特定のアクションを取り消すことができるようにsubscribe().filter()オペレータを追加しようと思います。 concatMap()を使用しているので、アクションは常に追加された順に呼び出されます。その後、clear()メソッドを使用すると、配列内でキャンセルされるすべてのアクションがマークされます。

    の後に.do()オペレータを追加し、アキュムレータを使用して現在スケジュールされているアクションの数を把握することもできます。アクションを追加すると、scheduledAction++が発生し、.do()の直前は.subscribe()となり、scheduledAction--となります。次に、この変数を使用して、.delay(interval/2)で新しいアクションを連鎖するかどうかを決定できます。