2012-04-20 14 views
4

複数の(2〜10)個の観測値があり、それぞれが指定された時間ウィンドウで1つのイベントを発生させます。複数の観測値の順序を同期させる

私は3つの科目を持っていると言うと、彼らは任意の順序を次のように発射することを説明することができます:

Subjects: sA, sB, sC 

Order of time window 1: 
    sB, then sA, then sC 
Order of time window 2: 
    sA, then sC, then sB 
Order of time window 3: 
    sA, then sB, then sC 
etc. 

をだから私は、私は、ストリームを次のようになるだろうマージを行うならば:SB、SA、SC、SA、私は、時間ウィンドウ内でイベントの順序を強制したいと思っています。マージされたストリームを以下につながる

1. sA 
2. sB 
3. sC 

:SA、SB、SC、SA、SB、SC、SA、SB、SCなど

ウィンドウ時間自体は知られているが、各以来されていませんイベントは各ウィンドウで一度だけ発生します。すべてのサブジェクトが起動すると、ウィンドウは閉じられ、新しいウィンドウが開きます。

どのようにすればいいのですか?

拡張質問(重要ではない):観測値は互いに独立しているので、共通の静的メソッドを導入する必要があります。ここで、各観測値の順序を同期できます。

public static IObservable<T> SynchronizeOrder<T>(IObservable<T> source, int order) 
{ 
    //return synchronized source 
} 

更新:私の元の質問には明確ではありませんでした何かが観測は、異なるイベントタイプのものとすることができるということでした(私は私の例では、特定のイベントデータタイプを使用していない理由です)、その結果、私は順序付けられた観測値をマージしたくありません。私は、独立したオブザーバブルがその順序で同期していることを保証できる仕組みを作りたいと思っています。

myEventStream 
.ObserveOn(TaskFactory) 
.DoSomeExpensiveComputation() 
.Order(2) //doesn't have to be an extension method 
.Subscibe(computationResult=> 
      sharedRessourceWhereOrderMatters.Update(computationResult)) 
+0

? 'sA'は各ウィンドウを閉じますが、何が開かれているか分からずに、' sB'と 'sC'をどれだけ長く保持するか分かりません。 – yamen

+0

良い点、私の質問は十分にはっきりしていない、それを更新しました。 – lukebuehler

答えて

3

これは、あなたが観測可能な結合使って何をしたいん:

using System.Reactive; 
    using System.Reactive.Subjects; 
    using System.Reactive.Linq; 

    var a = new Subject<int>(); 
    var b = new Subject<int>(); 
    var c = new Subject<int>(); 

    var test = 
     Observable.When(
      a.And(b).And(c).Then((a1, b1, c1) => new int[] { a1, b1, c1 }) 
     ).SelectMany(arr => arr.ToObservable()); 

    var sub = test.Subscribe(val => Console.Write("{0} ", val)); 

    a.OnNext(1); 
    b.OnNext(2); 
    c.OnNext(3); 

    a.OnNext(1); 
    c.OnNext(3); 
    b.OnNext(2); 

    c.OnNext(3); 
    a.OnNext(1); 
    b.OnNext(2); 

    Console.ReadKey(); 
    sub.Dispose(); 

出力は次のとおりです。

1 2 3 1 2 3 1 2 3 

お知らせがあること:これは提供されません

  1. あなたにできる拡張メソッドいくつかの順序で特定の観測可能なものを注入する。 And(これは少しずつ行うことができます)を使用して計画を作成し、最終的にそれを実行する必要があります。 「ビルドアップ」オーダーが出力オーダーになります。

  2. これは実際のウィンドウの作成を見ません。しかし、観察可能な鎖のどこかに注入できない理由はありません。

  3. この解決策は、3つの観測値のすべてが値を持つたびにに発射します。あなたはそれを設定し、正しいポイントでそれを引き裂いて、望ましい動作を得る必要があるかもしれません。

+0

問題を考えてもらえましたが、私は別の解決策を使用してしまいました。 – lukebuehler

1

yamensの回答は、同じ種類の同じ種類の観測値の場合に最適です。私のケース(拡張質問)では、オブザーバブルは異なるタイプにすることができ、実行の任意の時点で順序を同期させることができます。

だからここに私の提案シンプルなソリューションです:各ウィンドウが開きます何

private readonly object syncRoot = new object(); 
private int sourceCount; 
private readonly SortedDictionary<int, Action> buffer = new SortedDictionary<int, Action>(); 

public IObservable<T> Order<T>(IObservable<T> source, int order) 
{ 
    return Observable.Create<T>(observer => 
    { 
     lock (syncRoot) 
      sourceCount++; 

     source.Synchronize(syncRoot).Subscribe(item => 
     { 
      buffer[order] =() => observer.OnNext(item); 
      if(buffer.Count == sourceCount) 
      { 
       foreach (var action in buffer.Values) 
        action(); 
       buffer.Clear(); 
      } 
     }, observer.OnError, observer.OnCompleted); 

     return() => 
     { 
      lock (syncRoot) 
       sourceCount--; 
     }; 
    }); 
}//end order 
+0

私はあなたが何を意味するのかを見ます:異なる時に同期しますが、ここで観察可能なもののタイプは同じです。結局、観測可能なストリームをマージする場合、最後に単一の統一タイプが必要です。私の答えはそれを提供することができます(必要に応じて 'Then'で別の選択関数を使用してください)。あなたのソリューションは適切なアイデアを持っていますが、例外的なシナリオでは脆弱になります(例えば、1回の観測で2回以上発生します)。 – yamen

+0

はい、私は最終的に合併が必要であるという誤った印象を与えたと思いますが、結果の順序を説明したかっただけです。私の場合は、同じデータストリームを非同期的に扱う複数の独立したコンポーネント(ほとんどプラグインに似ています)があります。たとえば、いくつかの計算があります。彼らはシングルトンやサービスを共有しています。シングルトンやサービスでは、計算の特定のポイントの後にオブザーバブルの継続の順序を同期させることができます。いくつかのシナリオではコンポーネントに優先順位がありますが、コンポーネント開発者に任せたいからです。 – lukebuehler

+0

ああ、それは理にかなっています。運が良かった。 – yamen

関連する問題