2012-05-09 8 views
1

これはかなり高いレベルの質問になります。 私は大きなデータセットを持ち、各行を(独立して)検証する必要があります。 Parallel.Foreachを使用して、その行で検証メソッドを呼び出したいとします。このメソッドがスレッドセーフであると仮定します。 検証でエラーが発生した場合は、このエラーが発生したデータ行を更新する必要があります。 私は明らかにバックグラウンドスレッドからこれを行うことはできません。しかし、私はこのエラー処理を実装する最良の方法が何であるかはわかりません。 これを実装する私の考えは、行IDとエラーを書き込みのためにスレッドセーフなので、BlockingCollectionに格納することです。その後、私は常にバックグラウンドスレッドからポーリングし、バックグラウンドスレッドがデータを見つけたら、フォームを呼び出して現在の行を更新します。リアクティブ・フレームワークを使用してスレッドセーフ・コレクションを観察する

Reactive Frameworkを使用すると、これを行う簡単な方法があるのでしょうか?基本的には、複数のプロデューサのスレッドセーフなコレクションが必要です。新しい値がコレクションに追加されたときに '監視'され、 'OnNext'がメインスレッドで実行されます。これは可能ですか?理想的には、このようなことが発生する頻度を制御することもできます(メインスレッドのコールバックが複数の行を更新するように2〜3秒ごとに行うなど)ので、常にメインスレッドを呼び出すわけではありません。

お時間をいただきありがとうございます。

+0

[BlockingCollection](http://msdn.microsoft.com/en-us/library/dd267312.aspx)をポーリングする必要はありません。 [Take Method](http://msdn.microsoft.com/en-us/library/dd287085.aspx)は、アイテムが利用できるようになるまでブロックします。 – dtb

答えて

2

これはあなたの空想打つん方法:

IObservable<bool> ValidateAsync(Row item) 
{ 
    return Observable.Start(() => { 
     // TODO: Figure out if the row is valid 
     return true; 
    }, Scheduler.TaskPoolScheduler); 
} 

myBigDataTable.ToObservable() 
    .Select(x => ValidateAsync(x).Select(y => new { Row = x, IsValid = y })) 
    .Merge(10 /* rows concurrently */) 
    .ObserveOn(SynchronizationContext.Current /*assuming WinForms */) 
    .Subscribe(x => { 
     Console.WriteLine("Row {0} validity: {1}", x.Row, x.IsValid); 
    }); 

ノーロック、無愚かなコンテナ、ブロッキングなし、安全な100%の糸を。

+0

これは優秀に見える、ありがとう! – user981225

+0

小さな質問 - DataTableオブジェクトをObservableに変換するための拡張がありますか? – user981225

+0

一般に、IEnumerableはIObservableに変換できますが、DataTableにはあまり慣れていません。あなたが 'System.Reactive.Linq'に住みたい拡張メソッドのほとんどは –

関連する問題