8

これはすべてWindowsサービスで発生しています。タスク並列ライブラリを使用して一度にn個のアイテムのみを処理する

処理待ちのアイテムを保持するQueue<T>(実際にはConcurrentQueue<T>)があります。しかし、私は一度に1つしか処理したくないので、n個のアイテムを同時に処理したい、nは設定可能な整数です。

タスク並列ライブラリを使用してこれを行うにはどうすればよいですか?

私はTPLが同時処理のために開発者に代わってコレクションを分割することを知っていますが、それが後の機能であるかどうかはわかりません。私はマルチスレッドとTPLには新しいです。

答えて

4

代わりConcurrentQueue<T>の使用BlockingCollection<T>、あなたは消費者の任意の数のスレッドを開始し、BlockingCollectionTake方法を使用することができます。コレクションが空の場合は、Takeメソッドが自動的に呼び出し元スレッドでアイテムの追加を待ってブロックします。そうしないと、スレッドはすべてのキューアイテムを並列に消費します。しかし、あなたの質問がTPLの使用を述べたので、BlockingCollectionチェックthisの投稿を使用すると、Parallel.ForEachにいくつかの問題があることが判明しました。そのため、消費者スレッドの作成を自分自身で管理する必要があります。 new Thread(/*consumer method*/)またはnew Task() ...

+0

BlockingCollectionは、キューの目的を無効にします。私はそれが反復されている間、ブロッキングコレクションからアイテムを削除することはできません。 –

+2

[GetConsumingEnumerable](http://msdn.microsoft.com/en-us/library/dd287186.aspx)を使用することはできません。 'foreach(Item item in _collection.GetConsumingEnumerable())'のように、コレクションが空の場合にアイテムが追加されるのを待ってブロックします。 –

4

ここでは、TaskFactoryの拡張メソッドを作成する方法があります。

public static class TaskFactoryExtension 
{ 
    public static Task StartNew(this TaskFactory target, Action action, int parallelism) 
    { 
     var tasks = new Task[parallelism]; 
     for (int i = 0; i < parallelism; i++) 
     { 
      tasks[i] = target.StartNew(action); 
     } 
     return target.StartNew(() => Task.WaitAll(tasks)); 
    } 
} 

あなたの呼び出しコードは次のようになります。

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
var task = Task.Factory.StartNew(
() => 
    { 
    T item; 
    while (queue.TryDequeue(out item)) 
    { 
     ProcessItem(item); 
    } 
    }, n); 
task.Wait(); // Optionally wait for everything to finish. 

Parallel.ForEachを使用する別のアイデアがあります。このアプローチの問題点は、並列度が必ずしも高く評価されない可能性があることです。あなたは、絶対量ではなく、許容される最大量のみを示しています。

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, 
    (item) => 
    { 
    ProcessItem(item);  
    }); 
1

私はまた、直接ConcurrentQueueを使用するのではなく、BlockingCollectionを使用してお勧めします。コンストラクタでmaxConcurrentが同時に処理されますどのように多くの要求を定義していること

public class QueuingRequestProcessor 
{ 
    private BlockingCollection<MyRequestType> queue; 

    public void QueuingRequestProcessor(int maxConcurrent) 
    { 
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); 

    Task[] consumers = new Task[maxConcurrent]; 

    for (int i = 0; i < maxConcurrent; i++) 
    { 
     consumers[i] = Task.Factory.StartNew(() => 
     { 
     // Will wait when queue is empty, until CompleteAdding() is called 
     foreach (var request in this.queue.GetConsumingEnumerable()) 
     { 
      Process(request); 
     } 
     }); 
    } 
    } 

    public void Add(MyRequest request) 
    { 
    this.queue.Add(request); 
    } 

    public void Stop() 
    { 
    this.queue.CompleteAdding(); 
    } 

    private void Process(MyRequestType request) 
    { 
    // Do your processing here 
    } 
} 

注:

は、ここでの例です。

関連する問題