2013-08-03 3 views
5

FileSystemWatcherChangedイベントを観測可能なシーケンスに変換するコードを記述しました。Reactive Extensionsのコードがメモリにリークしますか?

私の目標は、2つのすべてのファイルシステムを分割して、別々のストリームに変更し、それらを調整することです。

たとえば、半分の時間で3回変化する10種類のファイルがある場合、それぞれのファイルに対して1回のみ通知を受け取ります。

でも、私には関係するのは、GroupBy()演算子です。これが機能するには、時間の経過とともにグループを構築し続け、少量のメモリを消費する必要があります(私が想定している)。

これは「リーク」を引き起こしますか?その場合、どうすれば防止できますか?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") { 
    EnableRaisingEvents = true, 
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size 
}; 

void Main() 
{ 
    var fileSystemEventStream = 
     Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs> 
      (
       _ => _watcher.Changed += _, 
       _ => _watcher.Changed -= _ 
      ) 
      .ObserveOn(ThreadPoolScheduler.Instance) 
      .SubscribeOn(ThreadPoolScheduler.Instance) 
      .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath) 
      ; 

    var res = 
     from fileGroup in fileSystemEventStream 
     from file in fileGroup.Throttle(TimeSpan.FromSeconds(1)) 
     select file; 

    res.Subscribe(
     ReceiveFsFullPath, 
     exception => { 
      Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace); 
     }); 

    Console.Read(); 
} 

void ReceiveFsFullPath(string s){ 
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId); 
    Console.WriteLine(s); 
} 
+0

あなたがC#ガーナージコレクターを信じていて、そうしてはならない場合、漏れはありません –

+0

しかし、グループは同じファイルを参照しています。私はC#がガベージコレクトされていることを理解していますが、それはメモリリークが発生しないことを意味するものではありません。 –

+0

[SciTechのメモリプロファイラ](http://memprofiler.com/)などのプロファイラを付けて試してみましたか? –

答えて

3

はい、新しいキーごとに、GroupByはSubjectを作成し、これらのサブジェクトの辞書を保持します。そして、あなたはこれらのそれぞれに購読しています。だから、古いエントリを解放するために、時間の経過とともに成長する小さなメモリのチャンクです。あなたが本当に必要とするのは、スロットルタイマーが切れたときにキーを取り除くことです。私は組み込み演算子でこれを行う方法を考えることはできません。したがって、カスタム演算子が必要です。ここに1つの刺し傷があります。

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay) 
{ 
    return Observable.Create(observer => 
    { 
     var notifications = new Subject<IObservable<T>>(); 
     var subscription = notifications.Merge().Subscribe(observer); 
     var d = new Dictionary<T, IObserver<T>>(); 
     var gate = new object(); 
     var sourceSubscription = new SingleAssignmentDisposable(); 
     var subscriptions = new CompositeDisposable(subscription, sourceSubscription); 
     sourceSubscription.Disposable = source.Subscribe(value => 
     { 
      IObserver<T> entry; 
      lock(gate) 
      { 
      if (d.TryGetValue(value, out entry)) 
      { 
       entry.OnNext(value); 
      } 
      else 
      { 
       var s = new Subject<T>(); 
       var o = s.Throttle(delay).FirstAsync().Do(() => 
       { 
       lock(gate) 
       { 
        d.Remove(value); 
       } 
       }); 
       notifications.OnNext(o); 
       d.Add(value, s); 
       s.OnNext(value); 
      } 
      } 
     }, observer.OnError, notifications.OnCompleted); 

     return subscriptions; 
    }); 
} 

... 
Observable.FromEventPattern(...) 
    .Select(e => e.EventArgs.FullPath) 
    .ThrottleDistinct(TimeSpan.FromSeconds(1)) 
    .Subscribe(...); 
+1

答えを書くのに費やした時間を感謝します。私はこのスレッドをMSDNフォーラムで見た。 http://social.msdn.microsoft.com/Forums/en-US/53a66a85-4ab3-4fe9-96c6-8b72cc034d0e/groupbyuntil-usageグループサイズが最大1の期限切れのグループを回答にすることができますか?組み込みのメカニズムがある場合は、ロックを使用してコードを書くことを避けたいと思います。 –

+0

はい。私はGroupByUntilについて知らなかった!それはAPIは少し厄介ですが、これはうまくいくはずです: 'filenames.GroupByUntil(f => f、g => g.Throttle(delay))。SelectMany(g => g.LastAsync()); – Brandon

1

ブランドンの返信によれば、被験者は成長し、再利用の方法はありません。ここでのメモリリークの主な懸念は、あなたがサブスクリプションをキャプチャしていないことです!すなわち

res.Subscribe(... 

は、サブスクリプションを捕捉しない場合、あなたは、このようにあなたが「リークしたメモリを持っている、ので、あなたはイベントハンドラを解放することはありません、サブスクリプションを処分することはできません

subscription = res.Subscribe(... 

に置き換える必要があります"あなたがサブスクリプションを実際に処分していないと明らかにこれは役に立たない。

*完成したら自動的に廃棄されるので、うまくいきます。 FileDeletedイベントが発生したときにシーケンスを完了することができますか?

+0

私はサブスクリプションをキャプチャしていますが、ちょっとしたコードを追加していません。基本的にはコードベースからクエリを取り出してLINQPadをフレンドリーにしました。たとえこのクエリがアプリケーションの稼動中に実行され、サブスクリプションが終了しない場合でも、私たちは "リーク"を持っています:) –

+0

清算をありがとう。 GroupByUntilはあなたが探しているようです(シーケンスをリリースしたい条件で説明していなくても) –

+1

上記の問題の定義により、シーケンスがスロットル期間よりも長くアイドルになるたびにシーケンスを解放することができます。 'GroupByUntil'を使って、元のバージョンのコードとバージョン間の出力シーケンスに"目に見える "違いはありません。ただし、後者はアイドル時にリソースを解放します。 – Brandon

関連する問題