2016-08-22 11 views
1

私は3秒のたびにSystem.Timers.Timerがあります。
一度それが私のコレクションの項目をすべてを取って1つのバッチで処理したいと思います。バッチプロセッサ(キューからアイテムを集約する)

その目的は、バックエンドシステムのI/O数を減らすことです。

私は、コレクション/キューに複数の同時スレッドが追加されています。このため、私はConcurrentQueue<T>を使用することを考えましたが、それは悪い選択です。

このarticle on social msdnは、ここでの問題を非常によく説明しています。

私が必要とするのは、一度にすべてのデータを取得して(ToArray())、コレクション/キューに書き込まれたデータを失わないように1つのアトミック操作でキューをクリアできるコレクション/キューです。その間に他のスレッド。

private static void T1_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    string[] result = _queue.ToArray(); 
    _queue = new ConcurrentQueue<string>(); // strings will be lost :-) 
} 

私はシンプルQueue<T>に簡単なロックベースのアプローチを使用する傾向があります。このコードはキューに追加しようとしている任意の他の生産者をロックアウトします。もちろん、

private static void ProduceItems() 
{ 
    //while (!_stop) 
    for(int i=0; i<int.MaxValue; i++) 
    { 
     if (_stop) break; 

     lock (_myLock) // bad. locks out other producers running on other threads. 
     { 
      Console.WriteLine("Enqueue " + i); 
      _queue.Enqueue("string" + i); 
     } 

     Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY 
    } 
} 

private static readonly object _myLock = new object(); 

private static void T1_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    string[] result; 
    lock (_myLock) 
    { 
     result = _queue.ToArray(); 
     _queue.Clear(); 
    } 
} 

今のコードのこの作品はプロデューサーのコードで見ることができる1つの明らかな欠陥を持っています。「T1_Elapsed」ロックが設定されている場合にのみ、プロデューサのロックを検証できる方法はありますか?

さらに私の問題に適していますか?多分何か観察できる?または、良い「バッチャー/アグリゲーター」の例がありますか?

UPDATE 1:あなたはRXで何ができるか恐ろしいRX
:)
私はまだ私は、このシナリオでは、エラー、再試行または再エンキューを扱うことができるかに探しています。

internal class Rx 
{ 
    internal static void Start() 
    { 
     ISubject<int> subject = new Subject<int>(); 
     ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED! 

     var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10) 
      .Subscribe((item) => ProcessBatch(item)); 

     for (int i=1; i<int.MaxValue; i++) 
     { 
      syncedSubject.OnNext(i); 
      Thread.Sleep(200); 
      Console.WriteLine($"Produced {i}."); 
     } 

     Console.ReadKey(); 
     subscription.Dispose(); 
    } 

    private static void ProcessBatch(IList<int> list) 
    { 
     // Aggregate many into one 
     string joined = string.Join(" ", list); 

     // Process one 
     Console.WriteLine($"Wrote {joined} to remote storage."); 

     // how do you account for errors here? 
     myProducer.ReEnqueueMyFailedItems(list); // ? 
    } 
} 

答えて

1

TPLデータフロー

私はTPLデータフローのライブラリに行くを与えると思います。これは、タスクパラ呼びライブラリをベースにしており、並行性が大きな役割を果たしているこれらの種類の要件に対応するように設計されています。このライブラリに関する一連のブログ記事については、http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.htmlを参照してください。

BatchBlockは、シナリオに適しているようです。チュートリアルについては、https://msdn.microsoft.com/en-us/library/hh228602(v=vs.110).aspxを参照してください。

BatchBlockを使用してのもう一つの例: https://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/

の代わりに使用可能なTPLデータフローブロックの1つに掲載する予定キューにデータを掲載。

別のオプションは良い導入のため

反応性拡張

参照http://www.introtorx.com/uat/content/v1.0.10621.0/01_WhyRx.htmlを使用することができ

それだけでなく、バ​​ッチ処理のサポートを提供しています。上記の例で

void Sample() 
{ 
    var dataprovider = new Subject<int>(); 

    var subscription = dataprovider 
     .Buffer(TimeSpan.FromMinutes(3)) 
     .Subscribe(listOfNumbers => 
     { 
      // do something with batch of items 
      var batchSize = listOfNumbers.Count; 
     }); 

    for(int i = 0; i <= 5; ++i) 
    { 
     dataprovider.OnNext(i); 
    } 

    subscription.Dispose(); 
} 

を、異なる複数のプロデューサを有効にするためにいくつかの変更が必要ですスレッドを追加するには、reactive extension OnNextを参照してください。これは簡略化されたコード(!)ですが、RXを使用するコンセプトの一般的な考え方を示しています。

バッファリングは、最大バッファサイズ、所定の時間枠または両方の組み合わせを使用して実行できます。だからあなたのタイマーを置き換えることもできます。

の代わりにあなたはそれがその痛みからあなたを解放しますので、クリアする必要がキューまたは同様のものを使用することを排除するSubject

TPLデータフローとRXの両方でOnNextを呼び出すキューに項目を追加します。

+0

私はそこに何かがあることを知っていました;-)私はRX拡張機能に本当に感銘を受けました。あなたの例は本当に助けになりました。今私はどのような再試行/失敗メカニズムがこれに収まるか把握する必要があります。 – lapsus

+0

それらを別のRXストリームに送信し、その特定のサブスクライブアクションでそれらを処理することができます。ビルドインのエラー処理戦略もありますが、失敗した場合に必要なアクションの種類によっては少し異なります。 –

関連する問題