2012-04-02 3 views
7

申し訳ありませんが、タイトルがあまり明確でない場合、私は何も考えられませんでした...Rxでは、一定期間後に最新のアイテムをグループ化する方法は?

私はIObservable<char>の形式でユーザー入力を受け取りました。 IObservable<char[]>のように、ユーザーが入力を1秒以上停止するたびに文字をグループ化します。次のように入力するのであれば、例えば、:

h 
e 
l 
l 
o 
(pause) 
w 
o 
r 
l 
d 
(pause) 
! 
(pause) 

私があることが観測可能な出力が欲しい:

['h', 'e', 'l', 'l', 'o'] 
['w', 'o', 'r', 'l', 'd'] 
['!'] 

私は解決策は非常に単純である疑いがあるが、私は見つけることができません適切なアプローチ...私はBufferGroupByUntilThrottleなどを使用しようとしましたが、役に立たなくなりました。

ご迷惑をおかけします。


編集:私はほとんど働く何かを持っている:

 _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1))) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

をしかし、私は新しい文字が入力されるたびにリセットされる遅延を必要とする...

答えて

7

BufferThrottleは、十分だろう。ホットにするには、.Publish().RefCount()を使用して、ソースへのサブスクリプションが1つだけになるようにします。

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source, 
               TimeSpan dueTime) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    //defer dueTime checking to Throttle 
    var hot = source.Publish().RefCount(); 
    return hot.Buffer(() => hot.Throttle(dueTime)); 
} 
+0

ありがとう、それは素晴らしい作品です。私のソリューションよりはるかにエレガントです。実際、私のソースはすでに暑いです(それは 'Subject 'です)。 'Publish()。RefCount()'の使用による影響が... –

+0

@ThomasLevesqueあなたのソースがすでに熱くなっていれば、Publish/RefCountは無駄なラッパー層に過ぎないと思います。これを一般的な関数として使用したい場合は、アプリケーションに問題があることが示されていない限り、そのまま残しておきます。これを一度だけ使用する場合は、パラメータを 'hotSource'に変更し、ドキュメンテーションコメントにメモを残して、Publish/RefCountを安全に削除する必要があります。 –

0

OK、私は見つかった解決策:タイムアウトが発生するたびに

 Func<IObservable<char>> bufferClosingSelector = 
      () => 
      _input.Timeout(TimeSpan.FromSeconds(1)) 
        .Catch(Observable.Return('\0')) 
        .Where(i => i == '\0'); 
     _input.Buffer(bufferClosingSelector) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

は基本的には、bufferClosingSelectorは、WHIの何かをプッシュchは現在のバッファを閉じます。おそらくもっとシンプルでエレガントな方法ですが、それはうまくいきます...もっと良い提案には私がついています;)

0

私はあなたの後に何かをするために何度かエクステンションを書いた - BufferWithInactivityです。

ここにある:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

ありがとうございました!しかし、それは私の解決策よりもずっと単純ではありません;) –

0

これは動作するはず。それはあなたのソリューションほど簡潔ではありません。拡張メソッドの代わりにクラスを通してロジックを実装するので、それを行うより良い方法かもしれません。要するに:charが得られるたびに、それをListに追加し、(再)1秒で期限切れになるタイマーを開始します。タイマーが切れると、Listを配列として加入者に通知し、次回の準備ができるように状態をリセットします。あなたのソースが高温になっている場合

class Breaker : IObservable<char[]>, IObserver<char> 
    { 
     List<IObserver<char[]>> observers = new List<IObserver<char[]>>(); 
     List<char> currentChars; 
     DispatcherTimer t; 
     public Breaker(IObservable<char> source) 
     { 
      source.Subscribe(this); 
      t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) }; 
      t.Tick += TimerOver; 
      currentChars = new List<char>(); 
     } 
     public IDisposable Subscribe(IObserver<char[]> observer) 
     { 
      observers.Add(observer); 
      return null; //TODO return a useful IDisposable 
     } 
     public void OnCompleted() 
     { 
      //TODO implement completion logic 
     } 
     public void OnError(Exception e) 
     { 
      //TODO implement error logic 
     } 
     public void OnNext(char value) 
     { 
      currentChars.Add(value); 
      t.Start(); 
     } 
     void TimerOver(object sender, EventArgs e) 
     { 
      char[] chars = currentChars.ToArray(); 
      foreach (var obs in observers) 
       obs.OnNext(chars); 
      currentChars.Clear(); 
      t.Stop(); 
     } 
    } 
関連する問題