2011-10-28 15 views
1

私は、observableの最新の20個の値を取得し、ブロッキングが発生することなくプロパティとして公開しようとしています。 MostRecentBarsゲッターが呼ばれるしかしIObservable TakeLast(n)and blocking

class Foo 
{ 
    private IObservable<int> observable; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return this.observable.TakeLast(20).ToEnumerable(); 
     } 
    } 
} 

、少なくとも20個の観測値があるまでToEnumerableは戻らないだろうと思われるので、これは、ブロックされています。現時点では、私のコードは次のようになります。

ブロックすることなく観測値の最新の値を最大20個まで公開する組み込みの方法がありますか?観測値が20未満の場合は、すべての値を返す必要があります。

+0

IObservable ojlovecd

答えて

0

あなたの要件に合った組み込みのRx演算子は考えられません。

class Foo 
{ 
    private IObservable<int> observable; 
    private Queue<int> buffer = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 

     this.observable 
      .Subscribe(item => 
      { 
       lock (buffer) 
       { 
        if (buffer.Count == 20) buffer.Dequeue(); 
        buffer.Enqueue(item); 
       } 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (buffer) 
      { 
       return buffer.ToList();  // Create a copy. 
      } 
     } 
    } 
} 
2

私はあなたに2つの選択肢を与えます。 1つはRx Scanオペレータを使用していますが、私はそれを読むのが少し複雑になると思います。もう1つはロック付き標準Queueを使用しています。選んでいいですよ。

(1)

class Foo 
{ 
    private int[] bars = new int[] { }; 

    public Foo(IObservable<int> bar) 
    { 
     bar 
      .Scan<int, int[]>(
       new int[] { }, 
       (ns, n) => 
        ns 
         .Concat(new [] { n, }) 
         .TakeLast(20) 
         .ToArray()) 
      .Subscribe(ns => bars = ns); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return bars; 
     } 
    } 
} 

(2)

class Foo 
{ 
    private Queue<int> queue = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(n => 
     { 
      lock (queue) 
      { 
       queue.Enqueue(n); 
       if (queue.Count > 20) 
       { 
        queue.Dequeue(); 
       } 
      } 
     }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (queue) 
      { 
       return queue.ToArray(); 
      } 
     } 
    } 
} 

私はこれらの助けを願っています。

+0

におけるいわゆるTakeLast方法は今Ilian Pinzonはかなり私は思い同じ 'Queue'の実装を書いたことを見た後ノーありますあなたの最善の策としてそのアプローチにしてください。私たちのソリューションがどれくらい近いのかは驚くべきことです。私は私の記事を投稿した後まで彼を見なかった。それにもかかわらず、 'Scan'演算子は見落とされがちですが、多くの状況で非常に便利です。 – Enigmativity

+0

私は実際に 'Scan'を使って最初のアプローチを好む、それは"最後に(最高)nを取る "を超えて観察可能なチェーンを続けることが可能になります。実際には、 'var recentAverageScore = scores.TakeUpToLast(25).Average();'などの拡張メソッド、あるいは 'IObservable 'の結果で何をしたいのかを、拡張メソッドに変えることは素晴らしいことです。 –

0

すでにあなたの答えを持っているが、私はバッファを使用してこのリプレイ件名を解決することを考えと同じような何かを思い付いた:これはあなたの問題に収まるなら、私を知ってみましょう

class Foo 
{ 
    private ReplaySubject<int> replay = new ReplaySubject<int>(20); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(replay); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      var result = new List<int>(); 
      replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread 
      return result; 
     } 
    } 
} 

1

私は私が反応性の拡張機能を使用してビルド任意のプロジェクトにアタッチする傾向があり、いくつかの拡張機能を持って、そのうちの一つは、スライディングウィンドウです:これは、最新のN個のアイテムの配列を返す

public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length) 
{ 
    Queue<T> window = new Queue<T>(); 

    return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) => 
    { 
     window.Enqueue(b); 
     if (window.Count > length) 
      window.Dequeue(); 
     return window.ToArray(); 
    }); 
} 

(またはN個のアイテムがまだ存在しない場合は、より少ない)。

あなたのケースでは、あなたが行うことができるはずは:

class Foo 
{ 
    private IObservable<int> observable; 
    private int[] latestWindow = new int[0]; 

    IDisposable slidingWindowSubscription; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
     slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a => 
      { 
       latestWindow = a; 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return latestWindow; 
     } 
    } 
}