2009-04-29 9 views
17

.NET用のスレッドセーフブロッキングキューの実装を探しています。 "スレッドセーフブロッキングキュー"とは: - Dequeueメソッド呼び出しがスレッドをブロックするキューへのスレッドセーフなアクセス。他のスレッドがある値を置く(Enqueue)までスレッドをブロックします。.NET上のスレッドセーフなブロッキングキューの実装

瞬時に私はこれを見つけました: http://www.eggheadcafe.com/articles/20060414.asp (ただし、.NET 1.1用です)

誰かがこの実装の正しさについてコメント/批判することができますか? もう1つお勧めします。 ありがとうございます。

答えて

8

この1つはどうですか?Creating a blocking Queue in .NET

.NET 1.1用に必要な場合は(私は質問から分かりませんでした)、汎用品をドロップしてをobjectに置き換えてください。

+0

リンクありがとうございます。それは "キュー"を検索してそのトピックを見つけられなかったのは奇妙です。さて、その話題と私のことは同じことです。 btw:.net 1.1に移植する必要はありません。 関連トピックの解決策はhttp://www.eggheadcafe.com/articles/20060414に非常によく似ています。asp – Shrike

+1

@Shrike、全く変なことではない。 StackOverflow検索の悪さのもう一つの例です。だから、みんながあなたにGoogle(と 'site:'コマンド)を代わりに使うように指示するのは悪いことです。 – Ash

+0

@Ash:おそらく本当です...それは確かに私がstackoverflowを検索する方法です; -p –

0

http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx

はとにかく出発点ですQueue.Synchronized、私はブロッキングキューを使用したことがありません。それほど関係のない投稿には申し訳ありません。

+1

これは単なるスレッドセーフなキューだと思いますが、これは彼が求めているものとは少し異なります。 – tylerl

+1

ええ、私は投稿した後にそれを見ました。 –

+0

を反映するように編集しました。Queue.Synchronized()によって返されたデキューのラッパーメソッドは、現在のスレッドをブロックしますか? No. 2つのスレッドがあり、そのうちの1つがキューに値を設定し、もう1つはそのキューから取得しているとします。 2番目のスレッドはCPUを消費するループにキューをプールする必要があります。これは悪いです。 – Shrike

0

Microsoftはこのことについてかなりいいサンプルがあります:あなたはそれを完全に制御を持っている場合は、コードを呼び出すにロックすることは、より良い選択肢であることを覚えておいてください

//Copyright (C) Microsoft Corporation. All rights reserved. 

using System; 
using System.Threading; 
using System.Collections; 
using System.Collections.Generic; 

// The thread synchronization events are encapsulated in this 
// class to allow them to easily be passed to the Consumer and 
// Producer classes. 
public class SyncEvents 
{ 
    public SyncEvents() 
    { 
     // AutoResetEvent is used for the "new item" event because 
     // we want this event to reset automatically each time the 
     // consumer thread responds to this event. 
     _newItemEvent = new AutoResetEvent(false); 

     // ManualResetEvent is used for the "exit" event because 
     // we want multiple threads to respond when this event is 
     // signaled. If we used AutoResetEvent instead, the event 
     // object would revert to a non-signaled state with after 
     // a single thread responded, and the other thread would 
     // fail to terminate. 
     _exitThreadEvent = new ManualResetEvent(false); 

     // The two events are placed in a WaitHandle array as well so 
     // that the consumer thread can block on both events using 
     // the WaitAny method. 
     _eventArray = new WaitHandle[2]; 
     _eventArray[0] = _newItemEvent; 
     _eventArray[1] = _exitThreadEvent; 
    } 

    // Public properties allow safe access to the events. 
    public EventWaitHandle ExitThreadEvent 
    { 
     get { return _exitThreadEvent; } 
    } 
    public EventWaitHandle NewItemEvent 
    { 
     get { return _newItemEvent; } 
    } 
    public WaitHandle[] EventArray 
    { 
     get { return _eventArray; } 
    } 

    private EventWaitHandle _newItemEvent; 
    private EventWaitHandle _exitThreadEvent; 
    private WaitHandle[] _eventArray; 
} 

// The Producer class asynchronously (using a worker thread) 
// adds items to the queue until there are 20 items. 
public class Producer 
{ 
    public Producer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     Random r = new Random(); 
     while (!_syncEvents.ExitThreadEvent.WaitOne(0, false)) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       while (_queue.Count < 20) 
       { 
        _queue.Enqueue(r.Next(0, 100)); 
        _syncEvents.NewItemEvent.Set(); 
        count++; 
       } 
      } 
     } 
     Console.WriteLine("Producer thread: produced {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

// The Consumer class uses its own worker thread to consume items 
// in the queue. The Producer class notifies the Consumer class 
// of new items with the NewItemEvent. 
public class Consumer 
{ 
    public Consumer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       int item = _queue.Dequeue(); 
      } 
      count++; 
     } 
     Console.WriteLine("Consumer Thread: consumed {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

public class ThreadSyncSample 
{ 
    private static void ShowQueueContents(Queue<int> q) 
    { 
     // Enumerating a collection is inherently not thread-safe, 
     // so it is imperative that the collection be locked throughout 
     // the enumeration to prevent the consumer and producer threads 
     // from modifying the contents. (This method is called by the 
     // primary thread only.) 
     lock (((ICollection)q).SyncRoot) 
     { 
      foreach (int i in q) 
      { 
       Console.Write("{0} ", i); 
      } 
     } 
     Console.WriteLine(); 
    } 

    static void Main() 
    { 
     // Configure struct containing event information required 
     // for thread synchronization. 
     SyncEvents syncEvents = new SyncEvents(); 

     // Generic Queue collection is used to store items to be 
     // produced and consumed. In this case 'int' is used. 
     Queue<int> queue = new Queue<int>(); 

     // Create objects, one to produce items, and one to 
     // consume. The queue and the thread synchronization 
     // events are passed to both objects. 
     Console.WriteLine("Configuring worker threads..."); 
     Producer producer = new Producer(queue, syncEvents); 
     Consumer consumer = new Consumer(queue, syncEvents); 

     // Create the thread objects for producer and consumer 
     // objects. This step does not create or launch the 
     // actual threads. 
     Thread producerThread = new Thread(producer.ThreadRun); 
     Thread consumerThread = new Thread(consumer.ThreadRun); 

     // Create and launch both threads.  
     Console.WriteLine("Launching producer and consumer threads...");   
     producerThread.Start(); 
     consumerThread.Start(); 

     // Let producer and consumer threads run for 10 seconds. 
     // Use the primary thread (the thread executing this method) 
     // to display the queue contents every 2.5 seconds. 
     for (int i = 0; i < 4; i++) 
     { 
      Thread.Sleep(2500); 
      ShowQueueContents(queue); 
     } 

     // Signal both consumer and producer thread to terminate. 
     // Both threads will respond because ExitThreadEvent is a 
     // manual-reset event--so it stays 'set' unless explicitly reset. 
     Console.WriteLine("Signaling threads to terminate..."); 
     syncEvents.ExitThreadEvent.Set(); 

     // Use Join to block primary thread, first until the producer thread 
     // terminates, then until the consumer thread terminates. 
     Console.WriteLine("main thread waiting for threads to finish..."); 
     producerThread.Join(); 
     consumerThread.Join(); 
    } 
} 
0

を。ループ内でキューにアクセスすることを検討してください。不必要にロックを複数回取得するため、パフォーマンスが低下する可能性があります。

1

Microsoftの例は優れていますが、クラスにはカプセル化されていません。また、(WaitAny呼び出しのため)消費者スレッドがMTAで実行されている必要があります。場合によっては、STAで実行する必要がある場合もあります(たとえば、COM相互運用を行っている場合)。このような場合、WaitAnyは使用できません。

私はここに、この問題を克服し、簡単なブロッキングキュークラスがあります。参考のため http://element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html

20

を、.NET 4には、この問題に対処するためにSystem.Collections.Concurrent.BlockingCollection<T>タイプを紹介します。ノンブロッキングキューの場合は、System.Collections.Concurrent.ConcurrentQueue<T>を使用できます。 ConcurrentQueue<T>は、OPの使用のためにBlockingCollection<T>の基になるデータストアとして使用される可能性が高いことに注意してください。

+0

これを実際に見られるようにいくつかのメソッドを追加するコードサンプルを提供できますか? – theJerm

関連する問題