2012-05-04 7 views
4
public interface Event 
{ 
     Guid identifier; 
     Timestamp ts; 
} 

私はReactive Extensionsを使用して金融機関の問題を書き直すことを考えています。リアクティブエクステンション(rx)を使用して時間内にイベントが列挙される

Guid(株式記号+ユニークエントロピーが埋め込まれている)、タイムスタンプ、Valueフィールドで識別されるイベントを取得することを前提としています。これらは高速であり、X秒後(10秒後)に「少なくとも」なるまでこれらのオブジェクトに対処することはできません。その後、それらを処理してシステムから削除する必要があります。

「10秒」の初期ウィンドウ(たとえばT0〜T10)の2つのウィンドウのように考えると、すべてのユニークイベント(基本的にはGUIDでグループ化)を特定し、次に「10秒 "、"二次ウィンドウ "(T10〜T20)を使用して、少なくとも10秒のポリシーを実装していることを確認します。 「初期ウィンドウ」からすべてのイベントが削除されます(これを考慮しているため)。「セカンダリウィンドウ」から、「初期ウィンドウ」で発生したものを削除します。そして、我々は10秒間スライディングウィンドウを移動し続けるので、ウィンドウT20-T30を見て、繰り返し、すすぎます。

これはRxでどのように実装できますか?これはやり方のようです。

+0

あなたはスライドウィンドウを言うが、あなたの例はジャンプウィンドウを使用しているようだ - それは? – yamen

答えて

5

メッセージのサーバークロックとタイムスタンプ(すなわち、「実生活」モード)に頼ることができる場合、は、ジャンプするのではなくスライドする10秒後になります

var guidSet = new HashSet<Guid>(); 
delayedEvents.Do(e => guidSet.Add(e.identifier)); 

あなたの場合」:ユニークなイベントなどの確認

var events = new Subject<Event>(); 
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10)); 

は、ある種のセットに追加するだけです:10秒のウィンドウ、そしてあなただけのイベント10秒遅らせることができますあなたは10秒待つ必要がありますその後、あなただけの代わりに10秒間バッファリングしたい、一度に最後の10秒を処理:

var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10)); 
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); }); 

私はそれが何をしたいのです想像できないように私は(スライド10秒のウィンドウの例を示していませんイベントは複数回処理されます)。


ここでは深刻なことがあります。ウォールタイムに頼るのではなく、イベント内の時間を使用してロジックを動かしたいとします。歴史的なスケジューラを作成し、元のものからスケジュールされたイベントを養う

public class Event 
{ 
     public Guid identifier; 
     public DateTime ts; 
} 

:イベントは次のように再定義されると仮定すると、

var scheduler = new HistoricalScheduler(); 
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts)); 
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e)); 

今、あなたは、単にtarget代わりのeventに定期的に受信コンビネータを使用することができ、かつスケジューラを通過するだけで適切にトリガされます。

var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler); 

これは簡単なテストです。秒ごとにトリガー百のイベント各「事実上」30秒間隔ではなく、リアルタイムでの作成:

var now = DateTime.Now; 
var test = Enumerable.Range(0,99).Select(i => 
    Scheduler.ThreadPool.Schedule(
     TimeSpan.FromSeconds(i), 
     () => events.OnNext(new Event() { 
      identifier = Guid.NewGuid(), 
      ts = now.AddSeconds(i * 30) 
     }) 
    ) 
).ToList(); 

は、それに登録して、バッファリングされたイベントの60秒を求める - と、実際に2つのイベントごとに2「実際の」秒を受け取ります(60バーチャル秒):

+0

yamen、これを正しく読んだら、あなたはMySchedulerでタイプミスがありますか?または、そのようなスケジューラを作成する必要がありますか? – halivingston

+0

これを正しく取得した場合、私は仮想時間内に1つのイベントをポンピングし、仮想時間で10秒間バッファを言うだけです。私は観察可能なものを決して終わらせませんよね?私はそれが本当であることを願っています。 – halivingston

+0

また、DumpLiveの権利のために、2秒で2つのイベント(60秒で予想される解約)を受け取る理由は何ですか?私が購読していれば、仮想時間で60秒目に2イベントをすぐに得ることができますか? – halivingston

関連する問題