2013-10-08 7 views
5

rabbitMqキューからのメッセージを並行して処理したい。キューはautoAck = falseに設定されています。私はcamel endpointsのためのcamel-rabbitMQサポートを使用していますが、これはthreadPoolSizeパラメータをサポートしていますが、これには効果がありません。 threadpoolsize = 20の場合でも、メッセージはキューから順番に処理されます。ウサギMq Javaクライアント並列消費

コードをデバッグすると、threadpoolsizeパラメータが、hereのようにrabbit connectionfactoryに渡すために使用されるExecutorServiceを作成するために使用されることがわかります。あなたがウサギに入るまで、これはすべてよく見えますConsumerWorkService。ここでは、メッセージは最大サイズ16のメッセージのブロックで処理されます。ブロック内の各メッセージは順次処理され、実行するサービスがあれば、次のブロックで起動されます。このコードスニペットは以下のとおりです。このエグゼキュータ・サービスの使用から、メッセージを並行してどのように処理できるかはわかりません。エグゼクティブサービスは、一度に1つの作業しか実行できません。

私には何がありませんか? ConsumerWorkServiceは、スレッドプールを使用しているにもかかわらず、

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

異なるブロックサイズを使用するようにConsumerWorkServiceを設定できますか? –

+1

こんにちはClaus、私はFergus Nelsonとしてgithub経由でCamel-rabbitmqコンポーネントにいくつか変更を加えました。私はRabbitMqConsumerを変更して、コンシューマーコンシューマーごとにチャネルを設定しました。私はすべてをテストしたときにJira +プルリクエストを作成します。 –

+0

@ mR_fr0g、私が理解しているように、Camel-RabbitMQコンポーネントに複数のチャネルを作成することで問題を解決しました。あなたのJiraチケットへのリンクを提供し、要求を引き出し、修正が存在するCamelのバージョンを指定できますか? – wheleph

答えて

3

のRabbitMQのマニュアルは、このことについて非常に明確ではないが、このプールは、並行してメッセージを処理する方法で使用されていないようです:

各チャンネルには独自のディスパッチスレッドがあります。チャネル当たり1つのコンシューマーの最も一般的な使用例については、これは、コンシューマーが他のコンシューマーを押さえていないことを意味します。チャネルごとに複数のコンシューマーがいる場合は、長期実行コンシューマーがそのチャネル上の他のコンシューマーへのコールバックのディスパッチを保留する可能性があることに注意してください。

http://www.rabbitmq.com/api-guide.html

このドキュメントスレッドごとに1 Channelを使用することを提案して、あなたは、単に並行処理の要求レベルと同じ数のChannel Sを作成した場合、実際には、メッセージがにリンクされている消費者の間で派遣されますこれらのチャンネル

私は2つのチャネルとコンシューマでテストしました.2つのメッセージがキューにある場合、各コンシューマは一度に1つのメッセージしか選択しません。あなたが言及した16のメッセージのブロックは干渉していないようですが、これは良いことです。

実際、Spring AMQPはメッセージを同時に処理する複数のチャネルとしても作成します。 SimpleMessageListenerContainer.setConcurrentConsumers(...)設定

私は期待通りに動作するために、これをもテストしてみました。

3

Channelインスタンスが1つある場合は、ConsumerWorkServiceを調べて正しく見つかったように、登録済みのコンシューマをシリアルに呼び出すことになります。それを克服するには2つの方法があります。

  1. 1つではなく複数のチャネルを使用します。
  2. 単一チャネルを使用しますが、コンシューマを特別な方法で実装します。彼らはただキューから入ってきたメッセージを選び、それをタスクとして内部スレッドプールに入れるべきです。

this postで詳細を見つけることができます。

関連する問題