2012-03-04 5 views
1

更新:私は説明があまりにも冗長であると思ったので、私がやろうとしていることを簡単に考えていると思います。私は、さまざまなタスクのためにそれらを利用したい複数のワーカーノードを持っています異なるキューから)。現在のところ、単一のキューに耳を傾けて、1つのタイプの作業に関連付ける方法しか知りませんが、別のキューが聞こえるようにして、ノードの同じクラスタがそれらを処理できるようにします。より明確なことを望みます。Javaを複数のRabbitMQキューにリッスンすることはできますか?


こんにちは皆、 私はこれが可能である疑いがあるが、私はそれを行うには、正確に方法を見つけ出すように見えることはできません。私はrabbitmqのサイト上のチュートリアルを見て、彼らは本当に役に立ち、同じプログラムで複数のキューを聴く方法を示していないことを除いて、私が望んだことをしました。

私のプログラム構造は、基本的にはいくつかのフェーズです。例えば、フェーズ1ではデータが多く収集され、フェーズ2ではそれを処理してデータベースにロードし、フェーズ3では他のデータなどと比較します。前のフェーズが完了するまで開始し、複数のマシンを使用して各フェーズをより早く完了させるためにキューシステムを使用したい(すべての消費者がフェーズ1で作業し、すべてのフェーズが完了したら、フェーズ2で一緒に作業するなど) 。

キューが空でコンピュータが次のキューに移動する可能性があり、すべての作業が完了しているか、完了しているためにコンピュータが空であるかどうかを知る方法がないため、我々はまだキューに作業を入れ始めていないからです。だから私は(私が間違っていれば私を修正する)良い方法は、すべてのフェーズに関連付けられているすべてのキューを聞くことだったと思った仕事はフェーズ1Queueに置かれて、それに動作し、仕事がphase2Queueに入れられたら、私はプロセスの外に別のプロセスを持っています。私は、各フェーズが完了したときにモニターを記述して、次のフェーズをセットアップします)。希望は意味をなさない。

キューサンプルのコードは、1つのキューをリッスンする消費者にとって役立ちますが、複数のキューを聴取するにはどうすればよいですか(また、キューごとに異なるプログラムを呼び出す方法)。これにはすでに機能があっても素晴らしいですが、私はjavaでこれを実装するために使用できるロジックを探しています(最悪の場合、各キューをリッスンする5つのプログラムを実行することを考えましたが、より良い方法があれば、1つのアプリケーションにすべての作業を含めると、配布の管理が容易になります)。

ありがとうございます!

p.s.

import java.io.IOException; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Worker { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 

    public static void main(String[] argv) 
         throws java.io.IOException, 
         java.lang.InterruptedException { 

    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

    channel.basicQos(1); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer); 

    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 

     System.out.println(" [x] Received '" + message + "'"); 
     doWork(message); 
     System.out.println(" [x] Done"); 

     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
    } 
    } 
    //... 
} 

アップデート:コメントを1として、コーディネータ・プロセスは超簡単ですが、それはここrabbitmqのために働く消費者のコードがあります(ただし、あなたが見ることができるように、それが唯一のキューを定義する)場合に役立ちます。 Phase1Queueから監視し、一度そのキューが空であれば、単にPhase2Queueなどを満たすプロセスを開始します。

+1

私はRabbitMQに慣れていませんが、キューごとに複数のコンシューマを作成するのではありませんか?しかし、各マシンをどのようにフェーズ1からフェーズ2に移行するかは明確ではありません(おそらくコーディネーターを使用しています)。単一のキューを使用してタスクにタグを付けて、受信者がそれをどのように処理するか(つまりタスクはどのフェーズに対応するのか)を伝えることができます。 – DNA

+0

@DNAありがとう!実際には良いアイデアだと思っていませんでしたが、うまくいくかもしれません。私のコーディネータープログラムは、それがどの段階にあるのかを把握する必要がありますが、私のメインプログラムでフェーズを追加/削除するのも簡単です。私は、rabbitmqで複数の引数を送信することが可能かどうかは分かりませんが、今すぐ見てみましょう。 –

答えて

1

別の方法は、異なるモジュール/サービス/プロセスに別々のフェーズを分けることです。それらのすべてが同時に実行されます。

各フェーズの消費者は、異なるキューを待ち受けます。各フェーズは、次のフェーズのキューのメッセージを生成します。

このようにすれば、単一の責任を負うことで複雑さを軽減できます。各プロセスは特定のキューを消費し、別のキューを生成します。必要に応じて、これらの別々のプロセスを拡大/縮小することができます。

これは私たちが使用しているパターンです。我々は最終的に100倍以上のサブジョブを作成する初期ジョブを持っています。最初のジョブは非常に小さく、実行が速いのに対して、サブジョブは潜在的に長時間実行されるリクエストであり、少数のクラウドインスタンスでサービスを提供します。これらのジョブは、結果を別のキューで戻します。その結果は照合され、データベースに追加されます。

関連する問題