メッセージのサーバークロックとタイムスタンプ(すなわち、「実生活」モード)に頼ることができる場合、とは、ジャンプするのではなくスライドする10秒後になります
var guidSet = new HashSet<Guid>();
delayedEvents.Do(e => guidSet.Add(e.identifier));
あなたの場合」:ユニークなイベントなどの確認
var events = new Subject<Event>();
var delayedEvents = events.Delay(TimeSpan.FromSeconds(10));
は、ある種のセットに追加するだけです:10秒のウィンドウ、そしてあなただけのイベント10秒遅らせることができますあなたは10秒待つ必要がありますその後、あなただけの代わりに10秒間バッファリングしたい、一度に最後の10秒を処理:
var bufferedEvents = events.Buffer(TimeSpan.FromSeconds(10));
bufferedEvents.Do(es => { foreach (var e in es) guidSet.Add(e.identifier); });
私はそれが何をしたいのです想像できないように私は(スライド10秒のウィンドウの例を示していませんイベントは複数回処理されます)。
ここでは深刻なことがあります。ウォールタイムに頼るのではなく、イベント内の時間を使用してロジックを動かしたいとします。歴史的なスケジューラを作成し、元のものからスケジュールされたイベントを養う
public class Event
{
public Guid identifier;
public DateTime ts;
}
:イベントは次のように再定義されると仮定すると、
var scheduler = new HistoricalScheduler();
var driveSchedule = events.Subscribe(e => scheduler.AdvanceTo(e.ts));
var target = events.SelectMany(e => Observable.Timer(e.ts, scheduler).Select(_ => e));
今、あなたは、単にtarget
代わりのevent
に定期的に受信コンビネータを使用することができ、かつスケジューラを通過するだけで適切にトリガされます。
var bufferedEvents = target.Buffer(TimeSpan.FromSeconds(10), scheduler);
これは簡単なテストです。秒ごとにトリガー百のイベント各「事実上」30秒間隔ではなく、リアルタイムでの作成:
var now = DateTime.Now;
var test = Enumerable.Range(0,99).Select(i =>
Scheduler.ThreadPool.Schedule(
TimeSpan.FromSeconds(i),
() => events.OnNext(new Event() {
identifier = Guid.NewGuid(),
ts = now.AddSeconds(i * 30)
})
)
).ToList();
は、それに登録して、バッファリングされたイベントの60秒を求める - と、実際に2つのイベントごとに2「実際の」秒を受け取ります(60バーチャル秒):
あなたはスライドウィンドウを言うが、あなたの例はジャンプウィンドウを使用しているようだ - それは? – yamen