2

複数のスレッドから取り込むことができる要求のキューを実装する必要があります。このキューが1000個の完了した要求よりも大きくなると、この要求はデータベースに格納されます。ここに私の実装です:ConcurrentQueueからchuncksを正しく消費する方法

public class RequestQueue 
{ 
    private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>(); 
    private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>(); 

    private static volatile bool isLoading = false; 
    private static object _lock = new object(); 

    public static void Launch() 
    { 
     Task.Factory.StartNew(execute); 
    } 

    public static void Add(VerificationRequest request) 
    { 
     _queue.Add(request); 
    } 

    public static void AddRange(List<VerificationRequest> requests) 
    { 
     Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3}, 
      (request) => { _queue.Add(request); }); 
    } 


    private static void execute() 
    { 
     Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest); 
    } 

    private static void EnqueueSaveRequest(VerificationRequest request) 
    { 
     _storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request)); 
     if (_storageQueue.Count > 1000 && !isLoading) 
     { 
      lock (_lock) 
      { 
       if (_storageQueue.Count > 1000 && !isLoading) 
       { 
        isLoading = true; 

        var requestChunck = new List<VerificationRequest>(); 
        VerificationRequest req; 
        for (var i = 0; i < 1000; i++) 
        { 
         if(_storageQueue.TryDequeue(out req)) 
          requestChunck.Add(req); 
        } 
        new VerificationRequestRepository().InsertRange(requestChunck); 

        isLoading = false; 
       } 
      } 
     }    
    } 
} 

ロックとisLoadingなしでこれを実装する方法はありますか?

+0

なぜロックを使用しないのですか?この場合、パフォーマンスに影響を与えないようです。 – Evk

+0

私は同意しますが、もっと良い方法があるかもしれません。また、私はisLoadingで正しくロックを実装することを確信していません – xalz

+0

なぜあなたは 'isLoading'を必要としますか?単純に削除すると何が変わるのですか? – zerkms

答えて

3

TPL Dataflowライブラリのブロックを使用するのが最も簡単な方法です。例:

var batchBlock = new BatchBlock<VerificationRequest>(1000); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       new VerificationRequestRepository().InsertRange(records); 
}; 

batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true }); 

これはそれです。

あなたの仕事を終えたら、あなたは

batchBlock.Post(new VerificationRequest(...)); 

で始まるブロックにメッセージを送ることができます、あなたはパイプライン全体をテイクダウンし、batchBlock.Complete();を呼び出すことにより、残ったメッセージをフラッシュし、最後のブロックが終了するのを待つことができます。

batchBlock.Complete(); 
await exportBlock.Completion; 

1000までBatchBlockバッチは、1000件のアレイに記録し、次のブロックに渡します。 ActionBlockでは、デフォルトで1タスクしか使用されないため、スレッドセーフです。

var repository=new VerificationRequestRepository(); 
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{ 
       repository.InsertRange(records); 
}; 

ほとんどすべてのブロックには、同時入力バッファがあります。各ブロックは独自のTPLタスクで実行されるため、各ステップは互いに同時に実行されます。つまり、パイプラインを流れるメッセージを変更するのに、TransformBlockを使用するなど、複数のリンクされたステップがある場合は、非同期で "無償で"実行できます。

このようなパイプラインを使用して、外部サービスを呼び出し、応答を解析し、最終レコードを生成し、バッチ処理し、SqlBulkCopyを使用するブロックでデータベースに送信するパイプラインを作成します。