2009-11-18 11 views
27

私はオブジェクトのブロッキングキューを持っています。java BlockingQueueにブロッキング・ピークがありませんか?

キューにオブジェクトがあるまでブロックするスレッドを作成したいと思います。 BlockingQueue.take()によって提供される機能に似ています。

しかし、オブジェクトを正常に処理できるかどうかわからないので、私はちょうどpeek()してオブジェクトを削除しないでください。私はそれを正常に処理できる場合にのみオブジェクトを削除したいと思います。

したがって、私はブロッキングpeek()関数が好きです。現在、peek()は、javadocsのようにキューが空の場合にのみ返します。

何か不足していますか?この機能を実現する別の方法はありますか?

編集:私はちょうど、スレッドセーフなキューを使用して覗くと、代わりに眠っていた場合に

任意の考え?

public void run() { 
    while (!__exit) { 
     while (__queue.size() != 0) { 
      Object o = __queue.peek(); 
      if (o != null) { 
       if (consume(o) == true) { 
        __queue.remove(); 
       } else { 
        Thread.sleep(10000); //need to backoff (60s) and try again 
       } 
      } 
     } 
     Thread.sleep(1000); //wait 1s for object on queue 
    } 
} 

私は1つのコンシューマスレッドと1つの(別の)プロデューサスレッドしか持たないことに注意してください。私はこれがBlockingQueueを使用するのと同じくらい効果的ではないと思います...

答えて

14

あなたはLinkedBlockingDequeを使用して、物理的に(takeLast()を使用して)キューから項目を削除しますが、処理がputLast(E e)を使用して失敗した場合で再びキューの終わりに、それを置き換えることができます。一方、あなたの "生産者"はputFirst(E e)を使用してのフロントにの要素を追加します。

あなたは常にあなた自身のQueue実装内でこの動作をカプセル化し、基礎となるLinkedBlockingDequeの舞台裏putLast()続いtakeLast()を行いblockingPeek()方法を提供することができます。したがって、呼び出し側クライアントの観点からは、要素は決してキューから削除されません。

+2

これは良い提案です。私がここで見ることができる唯一の問題は、アイテムを処理している間にキューがいっぱいになると、現在のアイテムをキューに戻すことができなくなるということです。 – rouble

+0

ラッパーの実装で追加の同期を使用することでこれを得ることができます。したがって、take +アトミック操作を実行します。無制限のキューを使用することもできます。 – Adamski

+2

キューの状態の変更を他のスレッドに公開することが難しいため、削除と再追加をお勧めします。おそらくpeek()を実装するために多忙なポーリングを使用するかもしれません。またはポーリングしたくない場合は、ラッパーのキューに関連付けられたセマフォを使用してください。 –

1

イベントリスナーキューをブロッキングキューに追加するだけで、(ブロッキング)キューに何かが追加されたときにリスナーにイベントを送信することもできますか? actionPerformedメソッドが呼び出されるまでスレッドブロックをブロックすることができます。

2

私はそれを承知している唯一のことは、これはApache Commons CollectionsBlockingBufferあるん

取得または削除のいずれかの場合には、追加することを、空のバッファ、呼び出し元のスレッド通知の 待機 に呼び出されましたか addAll操作が完了しました。

get()peek()に相当し、Bufferはあなたが指定している機能はありませんBlockingBuffer

0

BlockingQueueの自身のように見えるとUnboundedFifoBufferを飾ることでBlockingQueueのように作用させることができます。

問題を少しでも再構成しようとするかもしれません。「正しく処理できない」オブジェクトではどうしますか?あなたがそれらをキューに残しているだけなら、ある時点でそれらを取り出してそれらを処理する必要があります。私はそれらを処理する方法を考え出すことをお勧めします(通常、queue.get()が何らかの無効または悪い値を与える場合、おそらく床に落とすだけでOKです)。 FIFO。

1

クイックアンサーは、本当にブロッキングピークがあるわけではありません。ブロッキングpeek()を使用してブロッキングキューを実装しているバーです。

何か不足していますか?

PEEK()は並行処理で面倒なことができます - あなたは、複数の消費者を持っていない限り、それは、キューに残されます - あなたはPEEK()、D 'のメッセージを処理できない場合は

  • 処理できない場合は、誰がそのオブジェクトをキューから取得しますか?
  • コンシューマが複数ある場合、peek()とアイテムを処理する別のスレッドとの競合状態が発生し、処理が重複したり悪化したりします。再:あなたの最後の例:

は、あなたが実際に項目を削除し、 Chain-of-responsibility pattern

Editを使用して、それを処理したほうが良いかもしれないような音だけ1人の消費者を持っている場合は、あなたが取り除くことは決してありませんキューにあるオブジェクト(その間に更新されていない場合)は、スレッドの安全性に非常に注意する方がよいでしょう。おそらく、キューにアイテムを置いてはいけません。

6

しかし、オブジェクトを正常に処理できるかどうかわからないので、私はちょうどpeek()してオブジェクトを削除しないといけません。私はそれを正常に処理できる場合にのみオブジェクトを削除したいと思います。

一般に、スレッドセーフではありません。 peek()の後に、オブジェクトが正常に処理できると判断した後、削除する前にtake()を削除して処理すると、別のスレッドがそのオブジェクトを取得する場合はどうなりますか?

+0

IMHOこの問題は、折り返して同期を取ることで解決できます。論理 – yerlilbilgin

関連する問題