2012-03-06 6 views
12

送信するメッセージを含む複数のBlockingQueuesがあります。キューよりも消費者を少なくすることは可能ですか?私はキューをループしてポーリングを続ける(ビジー待ち)ので、キューごとにスレッドを必要としません。代わりに、私は、メッセージがいずれかのキューで利用可能なときに目覚めたスレッドを1つ持っていたいと思います。複数のブロックキュー、単一のコンシューマ

+5

これと複数のプロデューサに与えられる単一のブロッキングキューとの違いは何ですか? –

+0

Alexが達成したいことは、複数のブロッキングキューの上にブロッキングキュー(ラッパー)を作成し、コンシューマが単一のブロッキングキューを待つことができるようにすることだと思います。おそらくこの状況は、Alexが同じブロッキング・キュー・インスタンスを使用するようにプロデューサに要求することを防ぎます。 – sjlee

+1

問題は、キューごとに複数のコンシューマを必要としないことです。 1つのキューにすべてをダンプすると、消費者は同じキューから食べることができます。 したがって、私はAのキューとBのキューを持っています。他のBがまだ取られている限り、Bは取られない。 – Alex

答えて

6

あなたができるトリックの1つは、キューのキューを持つことです。つまり、すべてのスレッドがサブスクライブする単一のブロッキングキューを用意することです。次に、BlockingQueuesのいずれかに何かをエンキューすると、この単一のキューにブロックキューもエンキューされます。

BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS]; 
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>(); 

新しい作業項目を取得する場合次に:

void addWorkItem(int queueIndex, WorkItem workItem) { 
    assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number"; 
    //Note: You may want to make the two operations a single atomic operation 
    producers[queueIndex].add(workItem); 
    producerProducer.add(producers[queueIndex]); 
} 

を今すぐあなたの消費者がproducerProducer上のすべてのブロックをできるように、のようなものを持っているでしょう。私はこの戦略がどれほど価値があるかはわかりませんが、あなたが望むものを達成しています。

+0

シンプルで効果的!ありがとう! – Alex

+0

@Alex:問題ありません。どういう仕組みになっているのは本当に好奇心です。 – mindvirus

4

LinkedBlockingMultiQueueあなたが求めていることはありますか?コンシューマは任意のBlockingQueuesをブロックすることはできませんが、単一の「マルチキュー」から「サブキュー」を作成して同じ効果を得ることは可能です。プロデューサはサブキューで提供され、コンシューマは単一のマルチキューをポーリングして要素を待つことをブロックできます。

また、他の要素を考慮する前に、いくつかのキューから要素を取り出すことも、優先順位をサポートしています。

例:

LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>(); 
q.addSubQueue(1 /* key */, 10 /* priority */); 
q.addSubQueue(2 /* key */, 10 /* priority */); 
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1); 
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2); 

次に、あなたが提供できると投票:

sq1.offer("x1"); 
q.poll(); // "x1" 
sq2.offer("x2"); 
q.poll(); // "x2" 

は免責事項:私は、ライブラリの作者です。

関連する問題