2017-01-04 11 views
1

私はマルチスレッドの概念に慣れていません。特定の数の文字列をキューに追加し、それらを複数のスレッドで処理する必要があります。スレッドセーフであるConcurrentQueueを使用する。ConcurrentQueue with multithreading

これは私が試みたものです。ただし、並行キューに追加されたアイテムはすべて処理されません。最初の4項目のみが処理されます。

class Program 
{ 
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>(); 
    static void Main(string[] args) 
    { 
     new Program().run(); 
    } 

    void run() 
    { 
     int threadCount = 4; 
     Task[] workers = new Task[threadCount]; 

     for (int i = 0; i < threadCount; ++i) 
     { 
      int workerId = i; 
      Task task = new Task(() => worker(workerId)); 
      workers[i] = task; 
      task.Start(); 
     } 

     for (int i = 0; i < 100; i++) 
     { 
      iQ.Enqueue("Item" + i); 
     } 

     Task.WaitAll(workers); 
     Console.WriteLine("Done."); 

     Console.ReadLine(); 
    } 

    void worker(int workerId) 
    { 
     Console.WriteLine("Worker {0} is starting.", workerId); 
     string op; 
     if(iQ.TryDequeue(out op)) 
     { 
      Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
     } 

     Console.WriteLine("Worker {0} is stopping.", workerId); 
    } 


} 
+0

for (int i = 0; i < 100; i++) { iQ.Enqueue("Item" + i); } Volatile.Write(ref doneEnqueueing, true); 

労働者が値をチェックします:あなたは、エンキューが行われた後trueに設定されるブール変数を定義することができます。それは何か異なった仕組みですか? – waynemodz

+0

ワーカーを開始すると、キューは空になります。 – CSharpie

+0

その機能は[ActionBlock ](https://msdn.microsoft.com/en-us/library/hh194684(v=vs.110).aspx)クラス –

答えて

4

実装にはいくつか問題があります。最初と明らか一つはworker方法が唯一、ゼロまたは1つのアイテムをデキューしてから停止していることである:

if(iQ.TryDequeue(out op)) 
    { 
     Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
    } 

それは次のようになります。しかし、あなたのプログラムを動作させるには十分ではありません

while(iQ.TryDequeue(out op)) 
    { 
     Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
    } 

正しく。あなたのワーカーがメインスレッドがエンキューするよりも速くデキューしている場合、メインタスクがまだエンキュー中に停止します。あなたが止めることができる労働者に知らせる必要があります。あなたが別の番号にTHREADCOUNTを変更した場合

void worker(int workerId) 
{ 
    Console.WriteLine("Worker {0} is starting.", workerId); 
    do { 
     string op; 
     while(iQ.TryDequeue(out op)) 
     { 
      Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
     } 
     SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0)); 
    } 
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0)) 
    Console.WriteLine("Worker {0} is stopping.", workerId); 
} 
+0

こんにちは、1つのクエリは、このシグナリングは、エンキューが完了するまで待機するスレッドを停止するのだろうか? –

+0

@Naveenkumarエンキューが完了するまでシグナルが_finishing_から停止し、キューが空です。エンキューが進行している間、作業者は既に開始します。 – Sefe

+0

おかげで、あなたが私に動作するように正しい方向を与えている:) –

3

あなたの労働者がqueueが空になるまで、ちょうどそれらが動作させ、queueのうちの一つのアイテムを取ると、その後の作業を終えます。

あなたはすべてのアイテムに近い2人の労働者によって処理されることを、あなたが表示されます、それを実行するとwhile

void worker(int workerId) 
{ 
    Console.WriteLine("Worker {0} is starting.", workerId); 
    string op; 
    while (iQ.TryDequeue(out op)) 
    { 
     Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
    } 

    Console.WriteLine("Worker {0} is stopping.", workerId); 
} 

でワーカー機能でifを交換してください。理由:あなたのCPUには2つのコアがあり、どちらも動作しており、新しいタスクを作成するための「フリーティームスロット」はありません。あなたがアイテムを処理するために、すべての4タスクを持っているしたい場合は、、のようなものをanotehrタスクを作成するには、お使いのプロセッサ時間を与えるために遅延を追加することができます。あなたが望むことを、あなたの出力が得られます

while (iQ.TryDequeue(out op)) 
{ 
    Console.WriteLine("Worker {0} is processing item {1}", workerId, op); 
    Task.Delay(TimeSpan.FromMilliseconds(1)).Wait(); 
} 

... 
Worker 0 is processing item Item8 
Worker 1 is processing item Item9 
Worker 2 is processing item Item10 
Worker 3 is processing item Item11 
Worker 3 is processing item Item13 
Worker 1 is processing item Item12 
... 
+0

労働者2及び3と同様に何も処理しない、処理ブロックのパイプラインを作成するために、TPLデータフローライブラリから他のブロックとそれを組み合わせることができます。各ワーカーが処理するアイテムの数はいくつですか? – waynemodz

+0

彼らは、他のタスクは、これはあなたがアイテムを処理した後の待機を追加することができますシミュレートするために、待っていない、働いている起動する空き時間がありません:\t \tます。Console.WriteLine(「労働者{0}処理している項目{1}」、workerId、 op); \t \t Task.Delay(TimeSpan.FromMilliseconds(1))。Wait(); –

+0

@waynemodz各ワーカーが処理するアイテムの数に制限はありません。アイテムがキューに追加されている場合、スレッドが空いている場合はアイテムを処理する必要があります。 –