2017-02-07 11 views
0

複数のスレッドからの挿入を許可するConcurrentLinkedQueueがありますが、キューをポーリングするときに1つの関数で行い、キューが空になるまでポーリングします。これは、ポーリング中にスレッドがキューに挿入される可能性があるため、無限ループにつながります。 キューのビューを作成してポーリング前に空にしても、スレッドセーフにするにはどうすればよいですか?無限ループを回避するためにキューをスナップショットする方法

+0

私はイテレータがあなたが探しているものだと思います。 – NAMS

+0

@NAMS並行イテレータは弱く一貫性があります。その時のコレクションの視点である構築時に見られるものが保証されますが、繰り返しごとに要素がリークする可能性があります。追加するたびに100%の更新があれば、開始した場所に戻ります。 – xTrollxDudex

+1

なぜ空になるまでキューからすべてを取り出さなければならないのですか?あなたは、アイテムを引き出し始める前に、キュー内の要素の数を取得し、多くのアイテムを最大限に取り出すような何かを行うことができます。または、1つを取り出してすぐに処理できます。 –

答えて

1

私が見るものの1つの方法は、ConcurrentLinkedDequeを使用して、最後に追加したアイテムに到達するまで繰り返します。単一の終了キューでこれを行うことはできません。なぜなら、読み取りが最初に頭を見て、最後に追加された要素を見つけるためにテールを読み取る必要があるからです。

ConcurrentLinkedDequeが動作する方法は、offer(Object)add(Object)を呼び出すと、その項目がキューの末尾に配置されます。 poll()への通話ので、同じように、キューの先頭を読みます:あなたはより多くの項目を追加

// Read direction ---> 
HEAD -> E1 -> E2 -> E3 = TAIL 
// Write direction ---> 

として、尾は最後の要素を拡張しますが、我々は、我々は最後にそれを見たように、キューを空にしたいので、テールポインタをつかんで、テールに達するまで繰り返します。その後、キューを空にしている間に、追加されたものを後続の反復で扱うことができます。 We peek最初にpollを使用すると、最後に追加された値が削除されるため、マーカーが削除されるため、要素の削除をいつ終了するかを判断できません。あなたがプロデューサのデフォルトoffer(Object)add(Object)としてプロデューサーのコードを変更する必要はありません

ConcurrentLinkedDeque<Object> deque = new ConcurrentLinkedDeque<>(); 

public void emptyCurrentView() { 
    Object tail = deque.peekLast(); 
    if (tail != null) { 
     while (true) { 
      // Poll the current head 
      Object current = deque.poll(); 

      // Process the element 
      process(current); 

      // If we finish processing the marker 
      // Exit the method 
      if (current == tail) { 
       return; 
      } 
     } 
    } 
} 

は、末尾に要素を追加することとまったく同じことを行います。

0

キューのビューを作成し、ポーリング前に空にしてもスレッドセーフであるようにするにはどうすればよいですか?

これは本当に悪いパターンのようです。並行キュー実装を使用するポイントは、同時にキューに追加したりキューから削除したりできる点です。あなたはConcurrentLinkedQueueに固執したいなら、私はちょうどこのような何かをしたい:それはtake()をサポートしているので

// run every so often 
while (true) { 
    // returns null immediately if the queue is empty 
    Item item = blockingQueue.poll(); 
    if (item == null) { 
     break; 
    } 
    // process the item... 
} 

しかし、私は、代わりにLinkedBlockingQueueを使用するように切り替える検討します。

private final BlockingQueue<Item> blockingQueue = new LinkedBlockingQueue<>(); 
... 
while (!Thread.currentThread().isInterrupted()) { 
    // wait for the queue to get an item 
    Item item = blockingQueue.take(); 
    // process item... 
} 

BlockingQueueそうpoll()ループも利用可能であるQueueを拡張する:消費者スレッドは、このようなループになるであろう。

関連する問題