rabbitMqキューからのメッセージを並行して処理したい。キューはautoAck = falseに設定されています。私はcamel endpointsのためのcamel-rabbitMQサポートを使用していますが、これはthreadPoolSizeパラメータをサポートしていますが、これには効果がありません。 threadpoolsize = 20の場合でも、メッセージはキューから順番に処理されます。ウサギMq Javaクライアント並列消費
コードをデバッグすると、threadpoolsizeパラメータが、hereのようにrabbit connectionfactoryに渡すために使用されるExecutorServiceを作成するために使用されることがわかります。あなたがウサギに入るまで、これはすべてよく見えますConsumerWorkService
。ここでは、メッセージは最大サイズ16のメッセージのブロックで処理されます。ブロック内の各メッセージは順次処理され、実行するサービスがあれば、次のブロックで起動されます。このコードスニペットは以下のとおりです。このエグゼキュータ・サービスの使用から、メッセージを並行してどのように処理できるかはわかりません。エグゼクティブサービスは、一度に1つの作業しか実行できません。
私には何がありませんか? ConsumerWorkService
は、スレッドプールを使用しているにもかかわらず、
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
異なるブロックサイズを使用するようにConsumerWorkServiceを設定できますか? –
こんにちはClaus、私はFergus Nelsonとしてgithub経由でCamel-rabbitmqコンポーネントにいくつか変更を加えました。私はRabbitMqConsumerを変更して、コンシューマーコンシューマーごとにチャネルを設定しました。私はすべてをテストしたときにJira +プルリクエストを作成します。 –
@ mR_fr0g、私が理解しているように、Camel-RabbitMQコンポーネントに複数のチャネルを作成することで問題を解決しました。あなたのJiraチケットへのリンクを提供し、要求を引き出し、修正が存在するCamelのバージョンを指定できますか? – wheleph