0

RabbitMQを使用して、1台のマシンのプロデューサから、複数のマシンに分散された小さなグループのコンシューマにジョブを送信します。RabbitMQ:非ポーリング戦略を使用して複数のコンシューマを1つのキューに配置できますか?

プロデューサはジョブを生成してキューに配置し、コンシューマは10ミリ秒ごとにキューをチェックして、要求されていないジョブがあるかどうかを確認し、ジョブが使用可能な場合は一度にジョブをフェッチします。ある特定のワーカーがジョブを処理するのに時間がかかりすぎると(GCが一時停止したり、他の一時的な問題が発生した場合)、他のコンシューマはジョブをキューから削除してジョブのスループットを向上させます。

このシステムを最初に設定したとき、待ち行列に複数のコンシューマのサブスクライバ関係を設定して、余分なビットをポーリングして導入するのを防ぐことができませんでした。

ドキュメントを検査しても満足のいく回答が得られていません。私たちはメッセージキューを使うのが初めてであり、上記のシナリオを正確に記述する言葉がわからない可能性があります。これはblackboard systemのようなものですが、この場合、「専門家」はすべて同一であり、決してお互いの結果を消費することはありません。結果は常にジョブ作成者に報告されます。

アイデア?

答えて

0

ここで、rabbitMQチャンネルはスレッドセーフではないことに注意してください。 そう

ように私は私が2つのキューのコードサンプルを書かれているSCALA

Object QueueManager{ 

     val FACTORY = new ConnectionFactory 
     FACTORY setUsername (RABBITMQ_USERNAME) 
     FACTORY setPassword (RABBITMQ_PASSWORD) 
     FACTORY setVirtualHost (RABBITMQ_VIRTUALHOST) 
     FACTORY setPort (RABBITMQ_PORT) 
     FACTORY setHost (RABBITMQ_HOST) 

    conn = FACTORY.newConnection 
     var channel: com.rabbitmq.client.Channel = conn.createChannel 

    //here to decare consumer for queue1 
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable) 
     channel.queueDeclare(QUEUE1, durable, false, false, null) 
     channel queueBind (QUEUE1, EXCHANGE_NAME, QUEUE1_ROUTING_KEY) 
     val queue1Consumer = new QueueingConsumer(channel) 
     channel basicConsume (QUEUE1, false, queue1Consumer) 

    //here to decare consumer for queue2 
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable) 
     channel.queueDeclare(QUEUE2, durable, false, false, null) 
     channel queueBind (QUEUE2, EXCHANGE_NAME, QUEUE2_ROUTING_KEY) 
     val queue2Consumer = new QueueingConsumer(channel) 
     channel basicConsume (QUEUE2, false, queue2Consumer) 





    //here u should mantion distinct ROUTING key for each queue 
     def addToQueueOne{ 
    channel.basicPublish(EXCHANGE_NAME, QUEUE1_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes) 
    } 

    def addToQueueTwo{ 
channel.basicPublish(EXCHANGE_NAME, QUEUE2_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes) 
} 

def getFromQueue1:Delivery={ 
queue1Consumer.nextDelivery 
} 

def getFromQueue2:Delivery={ 
    queue2Consumer.nextDelivery 
} 

}

でのサンプルコードを書いています、これらすべてのRabbitMQの操作を処理するシングルトンクラスを作成上記のようなキューを追加することができます........

+0

サンプルコードはありがとうございます。私は具体的に* 1つの*キューの最後に複数の消費者が欲しい。複数の消費者が* 1つのキューに登録できますか? – jkndrkn

+0

はい、キューに対して複数のコンシューマを宣言できます。 –

+0

コードの例を教えてください。^_^ – jkndrkn

0

パブサブスクリプションを取得することは簡単です。うまくいく。プロジェクトには今すぐ役に立つヘルプページがありますhttp://www.rabbitmq.com/getstarted.html

RabbitMQにはタイムアウトとreserntフラグがあります。

あなたはまた、10msごとにチェックすることを奨励されたイベント駆動型になることができます。これについて助けが必要な場合は、http://rabbitears.codeplex.com/という小さなプロジェクトがあり、少し助けになるかもしれません。

+0

ちょっとサイモン、リンクありがとう。 pub-subは、私たちが何をしているのかかなり分かりません。この例では、http://www.rabbitmq.com/tutorial-three-python.htmlでは、同じメッセージが複数のキューで複数のコンシューマに送信されるファンアウトのシナリオを説明しています。複数の消費者がメッセージの*単一インスタンスを消費する機会を得るために競争してもらいたい。 – jkndrkn

関連する問題