2011-01-19 11 views
7

Reactive Extensionsを使用して、一部のメッセージを変換し、わずかな遅延の後に中継したいと考えています。Reactive Extensions(Rx)を使用した遅延と重複排除

メッセージは次のようになり:

  • 遅延の長さ:

    class OutMsg 
    { 
        int GroupId { get; set; } 
        string Content { get; set; } 
        OutMsg(InMsg in) 
        { 
         GroupId = in.GroupId; 
         Content = Transform(in.Content); // function omitted 
        } 
    } 
    

    が要件のカップルがあります。

    class InMsg 
    { 
        int GroupId { get; set; } 
        int Delay { get; set; } 
        string Content { get; set; } 
    } 
    

    出力は次のようになりますメッセージの内容に依存します。

  • 各メッセージにGroupIdがある
  • 新しいメッセージが送信待ちの遅延メッセージと同じGroupIdで受信された場合は、最初のメッセージを削除し、新しい遅延期間後に2番目のメッセージのみを送信する必要があります。

考えると、観察可能<InMsg>と送信機能:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

私は、変換を実行するために選択を使用できることを理解しています。私は、メッセージが遅れる指定適用することができますどのように

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • ? (メッセージの順序が乱れる可能性があるので注意してください)
  • 同じGroupIdでメッセージの重複を除去するにはどうすればよいですか?
  • Rxはこの問題を解決できますか?
  • これを解決する別の方法がありますか?

答えて

7

あなたは出力を遅らせるために、そしてSwitchを確認してください新しい値は、そのグループ内の前の値を置換するためにIGroupedObservableDelayを作るためにGroupByを使用することができます。

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

実装上の注意:もしグループ化された値は、メッセージが(すなわち遅延の後に)、それは

+0

私はこれで遊んできましたが、それは私が期待していたことを全くしません。サブスクリプションは "System.Collections.Generic.AnonymousObservable'1 [OutMsg]"を受信します – chillitom

+0

あなたは 'Switch'を呼び出さないようです。Visual Studioで「選択」にマウスを重ねると、 'IObservable 'が返されます。 'IObservable >'が返された場合は、Switch –

0

がRxのフレームワークはを使用して遅延を解消し、新たな遅延が開始されます送信され後に到着しました210拡張メソッド。重複排除によるキューイングは、遅延後に通常のLINQソートを適用してからDistinctUntilChangedを実行することで解決できます。

更新:私は、ここでの遅延アプローチは単独では機能しません。あなたは何らかの理由で、遅れている到着メッセージを待ち行列に入れる必要があります。これは、拡張メソッドBufferWithTimeによって実現されます。このメソッドは、メッセージのリストを返します。メッセージのリストは、次のオブザーバーにライン上でパブリッシュする前に、重複してピールすることができます。