2016-09-17 12 views
-1

並行キューにintegeresを追加する複数のタスクに苦労しています。私はいくつかのチュートリアルを見てきましたが、かなり複雑になっています。また、私は消費者 - 生産者の問題を実装したいので、これをしたいと思います。複数のタスクが並行キューに値を追加する方法

私は

class Program 
    { 
     const int MAX_VALUE = 20; 
     const int valueP = 0; 
     static BlockingCollection<int> bc = new BlockingCollection<int>(new ConcurrentQueue<int>(), 20); 

     static void producer(int value) 
     { 

      for (int i = 0; i < MAX_VALUE; i++) 
      { 
       bc.Add(value); 

       value++; 
       Console.WriteLine(value); 
       Thread.Sleep(2000); 
      } 
      Thread.Sleep(2000); 
     } 
     static void consuemr() 
     { 
      int item = 0; 
      item = bc.Take(); 
      Console.WriteLine("An item has been consumed", item); 
      Thread.Sleep(2000); 
     } 
     static void Main(string[] args) 
     { 
      Task t1 =Task.Run(() => producer(valueP)); 
      Task t2 = Task.Run(() => producer(valueP)); 

      Task.WaitAll(t1, t2); 
      Task.Run(() =>consuemr()); 


     } 
     } 

をやろうとしてきたが、その時間は、消費者

+0

静的なConcurrentQueueを作成してから、Parallel.For(0、int.MaxValue、i => {myConcurrentQueue.Enqueue(i);})のように追加します。あなたのコードを分かち合いたいと思っているのであれば、あなたが何を尋ねようとしているのかを理解することができます。それは何かをもっと意味のあるものにするために長い道のりになります。私はグーグルで、あなたの質問は具体的なものでなければなりませんか? –

+0

非常に便利です。書いたコードを分かち合うつもりですが、大丈夫かどうかわかりません。@ShannonHolsinger –

答えて

1

を主導したときに物事はあなたが指定した特定のコレクションが保持できる20項目の制限を持って作業を停止何これBlockingCollectionのコンストラクタです。

あなたは40個のアイテムを入れるので、20個のアイテムが追加されると、そのメソッドはブロックされます。

static BlockingCollection<int> bc = new BlockingCollection<int>(new ConcurrentQueue<int>(), 100); 

にコンストラクタに変更しようと、あなたはそれがその後、動作しますが表示されます。

これを解決するには、生産者がアイテムを追加している間に消費者がアイテムを取っていることを確認する必要があります。

あなたはすべての項目が追加されるまで、それが待機する生産者にTask.WaitAllを行うので(なぜならブロックしますAdd()方法に20の項目の次の呼び出しを追加した後に起こることはありません、https://msdn.microsoft.com/en-us/library/dd287137(v=vs.110).aspxの発言のセクションを参照してください)。

あなたは生産者がアイテムを追加またはBlockingCollectionのは、UpperBoundは、あなたがそれらを取る前に追加されるアイテムの数よりも大きいことを確認している間に、消費者がアイテムを取っているように、あなたのコードを再設計する必要があります。あなたの現在のコードに加えて

、私はこのような非生産コードで.WAIT()または.WaitAllを(使用していることを得る

Task.Run(() =>consuemr()).Wait(); 

Task.Run(() =>consuemr()); 

最後の行を置き換えます)問題ではありませんが、プロダクションコードでは、非同期の使用をお勧めします。

私は私のヒントを期待していた

完全な例はあなたに解決する方法についていくつかのアイデアを与えました。この方法を試してください。

class Program 
{ 
    const int MAX_VALUE = 20; 
    const int valueP = 0; 
    static BlockingCollection<int> bc = new BlockingCollection<int>(new ConcurrentQueue<int>(), 20); 

    static void producer(int value) 
    { 
     for (int i = 0; i < MAX_VALUE; i++) 
     { 
      bc.Add(value); 

      value++; 
      Console.WriteLine("Producing value {0}", value); 
      Thread.Sleep(20); 
     } 
     Thread.Sleep(20); 
    } 

    static void Consumer() 
    { 
     foreach(var item in bc.GetConsumingEnumerable()) 
     { 
      Console.WriteLine("An item has been consumed: {0}", item); 
     } 
    } 

    static void Main(string[] args) 
    { 
     Task t1 =Task.Run(() => producer(valueP)); 
     Task t2 = Task.Run(() => producer(valueP)); 
     Task t3 = Task.Run(() => Consumer()); 

     Task.WaitAll(t1, t2); 

     bc.CompleteAdding(); // signal end of producing values 

     t3.Wait(); // wait for consumer to read any unread values 
    } 
} 

は、UpperBound上限に達していないので、生産者がアイテムを追加している間、今別のスレッドで行われているアイテムを消費します。

別のオプションではなく、あなたは、例えば、ユーザ入力に基づいてアイテムを消費して停止するようにパラメータとしてCancellationTokenを取るGetConsumingEnumerableを使用することができbc.CompleteAdding();GetConsumingEnumerable()を使用してのです。

また、これは、(既存のコードを置き換え)と同様に動作します:

static void Consumer() 
{ 
    int item; 
    while(!bc.IsCompleted) 
    { 
     item = bc.Take(); 
     Console.WriteLine("An item has been consumed: {0}", item); 
     Thread.Sleep(20); 
    } 
} 

static void Main(string[] args) 
{ 
    Task t1 =Task.Run(() => producer(valueP)); 
    Task t2 = Task.Run(() => producer(valueP)); 
    Task t3 = Task.Run(() => Consumer()); 

    Task.WaitAll(t1, t2); 

    bc.CompleteAdding(); // signal end of producing values 

    t3.Wait(); // wait for consumer to read any unread values 
} 

私はこれがあなたに良いアイデアを与える願っています。解決すべき現実世界の問題がある場合は、他に必要なものを説明してください。ブロッキングコレクションから読み込む方法は数多くあるため、具体的な状況によって異なります。

+0

大丈夫ですしかし、私はまだプロデューサーが生産している間に消費者にアイテムを服用させるためにどうすればいいのかわかりません@Peter Bons –

+0

@SalvadorRomoは更新の回答を参照してください:-) –

+0

これは私が待っていたものです。私はどこで始めることができるのか分からなかったので、練習のためだけにティを実装したいので、感謝の男!!!他の消費者やプロデューサーを追加するだけで、別のタスクを追加するのと同じくらい簡単ですね。 @Peter Bons、 –

関連する問題