更新:私は説明があまりにも冗長であると思ったので、私がやろうとしていることを簡単に考えていると思います。私は、さまざまなタスクのためにそれらを利用したい複数のワーカーノードを持っています異なるキューから)。現在のところ、単一のキューに耳を傾けて、1つのタイプの作業に関連付ける方法しか知りませんが、別のキューが聞こえるようにして、ノードの同じクラスタがそれらを処理できるようにします。より明確なことを望みます。Javaを複数のRabbitMQキューにリッスンすることはできますか?
こんにちは皆、 私はこれが可能である疑いがあるが、私はそれを行うには、正確に方法を見つけ出すように見えることはできません。私はrabbitmqのサイト上のチュートリアルを見て、彼らは本当に役に立ち、同じプログラムで複数のキューを聴く方法を示していないことを除いて、私が望んだことをしました。
私のプログラム構造は、基本的にはいくつかのフェーズです。例えば、フェーズ1ではデータが多く収集され、フェーズ2ではそれを処理してデータベースにロードし、フェーズ3では他のデータなどと比較します。前のフェーズが完了するまで開始し、複数のマシンを使用して各フェーズをより早く完了させるためにキューシステムを使用したい(すべての消費者がフェーズ1で作業し、すべてのフェーズが完了したら、フェーズ2で一緒に作業するなど) 。
キューが空でコンピュータが次のキューに移動する可能性があり、すべての作業が完了しているか、完了しているためにコンピュータが空であるかどうかを知る方法がないため、我々はまだキューに作業を入れ始めていないからです。だから私は(私が間違っていれば私を修正する)良い方法は、すべてのフェーズに関連付けられているすべてのキューを聞くことだったと思った仕事はフェーズ1Queueに置かれて、それに動作し、仕事がphase2Queueに入れられたら、私はプロセスの外に別のプロセスを持っています。私は、各フェーズが完了したときにモニターを記述して、次のフェーズをセットアップします)。希望は意味をなさない。
キューサンプルのコードは、1つのキューをリッスンする消費者にとって役立ちますが、複数のキューを聴取するにはどうすればよいですか(また、キューごとに異なるプログラムを呼び出す方法)。これにはすでに機能があっても素晴らしいですが、私はjavaでこれを実装するために使用できるロジックを探しています(最悪の場合、各キューをリッスンする5つのプログラムを実行することを考えましたが、より良い方法があれば、1つのアプリケーションにすべての作業を含めると、配布の管理が容易になります)。
ありがとうございます!
p.s.
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
アップデート:コメントを1として、コーディネータ・プロセスは超簡単ですが、それはここrabbitmqのために働く消費者のコードがあります(ただし、あなたが見ることができるように、それが唯一のキューを定義する)場合に役立ちます。 Phase1Queueから監視し、一度そのキューが空であれば、単にPhase2Queueなどを満たすプロセスを開始します。
私はRabbitMQに慣れていませんが、キューごとに複数のコンシューマを作成するのではありませんか?しかし、各マシンをどのようにフェーズ1からフェーズ2に移行するかは明確ではありません(おそらくコーディネーターを使用しています)。単一のキューを使用してタスクにタグを付けて、受信者がそれをどのように処理するか(つまりタスクはどのフェーズに対応するのか)を伝えることができます。 – DNA
@DNAありがとう!実際には良いアイデアだと思っていませんでしたが、うまくいくかもしれません。私のコーディネータープログラムは、それがどの段階にあるのかを把握する必要がありますが、私のメインプログラムでフェーズを追加/削除するのも簡単です。私は、rabbitmqで複数の引数を送信することが可能かどうかは分かりませんが、今すぐ見てみましょう。 –