私は3秒のたびにSystem.Timers.Timer
があります。
一度それが私のコレクションの項目をすべてを取って1つのバッチで処理したいと思います。バッチプロセッサ(キューからアイテムを集約する)
その目的は、バックエンドシステムのI/O数を減らすことです。
私は、コレクション/キューに複数の同時スレッドが追加されています。このため、私はConcurrentQueue<T>
を使用することを考えましたが、それは悪い選択です。
このarticle on social msdnは、ここでの問題を非常によく説明しています。
私が必要とするのは、一度にすべてのデータを取得して(ToArray())、コレクション/キューに書き込まれたデータを失わないように1つのアトミック操作でキューをクリアできるコレクション/キューです。その間に他のスレッド。
private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result = _queue.ToArray();
_queue = new ConcurrentQueue<string>(); // strings will be lost :-)
}
私はシンプルQueue<T>
に簡単なロックベースのアプローチを使用する傾向があります。このコードはキューに追加しようとしている任意の他の生産者をロックアウトします。もちろん、
private static void ProduceItems()
{
//while (!_stop)
for(int i=0; i<int.MaxValue; i++)
{
if (_stop) break;
lock (_myLock) // bad. locks out other producers running on other threads.
{
Console.WriteLine("Enqueue " + i);
_queue.Enqueue("string" + i);
}
Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
}
}
:
private static readonly object _myLock = new object();
private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result;
lock (_myLock)
{
result = _queue.ToArray();
_queue.Clear();
}
}
今のコードのこの作品はプロデューサーのコードで見ることができる1つの明らかな欠陥を持っています。「T1_Elapsed」ロックが設定されている場合にのみ、プロデューサのロックを検証できる方法はありますか?
さらに私の問題に適していますか?多分何か観察できる?または、良い「バッチャー/アグリゲーター」の例がありますか?
UPDATE 1:あなたはRXで何ができるか恐ろしいRX
:)
私はまだ私は、このシナリオでは、エラー、再試行または再エンキューを扱うことができるかに探しています。
internal class Rx
{
internal static void Start()
{
ISubject<int> subject = new Subject<int>();
ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!
var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
.Subscribe((item) => ProcessBatch(item));
for (int i=1; i<int.MaxValue; i++)
{
syncedSubject.OnNext(i);
Thread.Sleep(200);
Console.WriteLine($"Produced {i}.");
}
Console.ReadKey();
subscription.Dispose();
}
private static void ProcessBatch(IList<int> list)
{
// Aggregate many into one
string joined = string.Join(" ", list);
// Process one
Console.WriteLine($"Wrote {joined} to remote storage.");
// how do you account for errors here?
myProducer.ReEnqueueMyFailedItems(list); // ?
}
}
私はそこに何かがあることを知っていました;-)私はRX拡張機能に本当に感銘を受けました。あなたの例は本当に助けになりました。今私はどのような再試行/失敗メカニズムがこれに収まるか把握する必要があります。 – lapsus
それらを別のRXストリームに送信し、その特定のサブスクライブアクションでそれらを処理することができます。ビルドインのエラー処理戦略もありますが、失敗した場合に必要なアクションの種類によっては少し異なります。 –