Reactive Extensionsを使用して、一部のメッセージを変換し、わずかな遅延の後に中継したいと考えています。Reactive Extensions(Rx)を使用した遅延と重複排除
メッセージは次のようになり:
- 遅延の長さ:
class OutMsg { int GroupId { get; set; } string Content { get; set; } OutMsg(InMsg in) { GroupId = in.GroupId; Content = Transform(in.Content); // function omitted } }
が要件のカップルがあります。
class InMsg { int GroupId { get; set; } int Delay { get; set; } string Content { get; set; } }
出力は次のようになりますメッセージの内容に依存します。
- 各メッセージにGroupIdがある
- 新しいメッセージが送信待ちの遅延メッセージと同じGroupIdで受信された場合は、最初のメッセージを削除し、新しい遅延期間後に2番目のメッセージのみを送信する必要があります。
考えると、観察可能<InMsg>と送信機能:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
私は、変換を実行するために選択を使用できることを理解しています。私は、メッセージが遅れる指定適用することができますどのように
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- ? (メッセージの順序が乱れる可能性があるので注意してください)
- 同じGroupIdでメッセージの重複を除去するにはどうすればよいですか?
- Rxはこの問題を解決できますか?
- これを解決する別の方法がありますか?
私はこれで遊んできましたが、それは私が期待していたことを全くしません。サブスクリプションは "System.Collections.Generic.AnonymousObservable'1 [OutMsg]"を受信します – chillitom
あなたは 'Switch'を呼び出さないようです。Visual Studioで「選択」にマウスを重ねると、 'IObservable'が返されます。 'IObservable >'が返された場合は、Switch –