2012-01-24 12 views
7

私はRabbitMqをより普遍的な方法で使用しようとしています(ただし、必要に応じてこの時点で他のメッセージキューの実装を選択できます)。 Rabbitプッシュメッセージを私の消費者に残す代わりに、消費者はキューに接続し、N個のメッセージのバッチを取り込みます(その間にいくつかの拒否を消費します)。その後、別のキューにジャンプします。これは冗長性のために行われます。一部の消費者がクラッシュすると、すべてのメッセージが他の消費者によって消費されることが保証されます。RabbitMqでロックとバッチフェッチメッセージ

問題は複数のコンシューマがいるため、同じキューで競合したくないということです。キューのロックを保証する方法はありますか?そうでない場合は、少なくとも2人の消費者が同じキューに接続しても同じメッセージを読まないことを確認できますか?トランザクションはある程度助けになるかもしれませんが、RabbitMQから削除されるという話を聞いたことがあります。

その他の建築上の提案も歓迎されています。

ありがとうございます!

編集: コメントに指摘されているように、メッセージをどのように処理する必要があるのか​​という点には特筆すべき点があります。それらはグループ内でのみ意味をなさないので、関連するメッセージがキューにまとめられている可能性が高いです。たとえば、100個のメッセージのバッチをプルすると、メッセージ1~3,4~5,6~10などで何かできることがあります。もし私がいくつかのメッセージのグループを見つけられなかったらそれらをキューに再送信します。 WorkQueueは、同じグループのメッセージを複数のワーカーに伝播させるため、機能しません。

+0

明らかに消費者は、これは不可能ですが、私は興味があった場合には、[ゴシップ](http://en.wikipedia.org/wiki/Gossip_protocol)のようなものを使用してそれらの間で同期することができます... –

答えて

6

Enterprise Integration Patternsで無料のオンラインブックを見ましたか?

メッセージがワーカーに届く前に、バッチャーコンポーネントが必要なワークフローが本当に必要なように聞こえます。 RabbitMQには2つの方法があります。バッチ処理を実行できる交換タイプ(およびメッセージフォーマット)を使用するか、キューを1つ持ち、バッチをソートして各バッチを独自のキューに配置するワーカーを使用します。バッターは、作業員が新しいバッチ・キューの存在を知ることができるように、制御キューに「バッチ準備完了」メッセージを送るべきである。バッチが処理されると、ワーカーはバッチキューを削除できます。

メッセージフォーマットを管理している場合、RabbitMQに暗黙のうちにいくつかの方法でバッチ処理を実行させることができます。トピック交換では、各メッセージのルーティングキーがwork.batchid.somethingの形式であることを確認することができ、バッチxxyzzの存在を知る作業者は#.xxyzz。#のようなバインディングキーのみを使用しますそれらのメッセージを消費する。再発行は必要ありません。

もう1つの方法は、バッチIDをヘッダーに組み込み、新しいヘッダー交換タイプを使用することです。少数のErlangコードを書く場合は、独自のカスタム交換タイプを実装することもできます。

ほとんどの人が始める典型的なワーカーキューの概念よりも優れたメッセージングアーキテクチャの概要を示すので、この本をチェックすることをお勧めします。

+0

これは私が欲しいものと同じように聞こえる。 Thans! –

+0

バッチでメッセージをプルするには、チャネル上でprefetch.countを使用することを検討する必要があります。 http://www.rabbitmq.com/consumer-prefetch.html –

2

コンシューマには1つのキューからの取り出しがあります。彼らはメッセージを共有しないことが保証されます(Rabbitは現在接続されているコンシューマ間でメッセージをラウンドロビンします)、その正確な使用パターンに大きく最適化されています。

すぐに使用できる、すぐに使用できます。 RabbitMQドキュメントではWork Queueモデルと呼ばれています。 1つのキュー、複数のコンシューマ、何も共有していないキュー。それはあなたが必要とするように聞こえる。

+0

残念ながら、私はメッセージをどのように処理する必要があるかという特殊性のために、それらはグループ内でのみ意味をなさないので、関連するメッセージがキューにまとめられている可能性が高いです。たとえば、100個のメッセージのバッチをプルすると、メッセージ1~3,4~5,6~10などで何かできることがあります。もし私がいくつかのメッセージのグループを見つけられなかったらそれらをキューに再送信します。 WorkQueueは、同じグループのメッセージを複数のワーカーに伝播させるため、機能しません。 –

+0

これは非常に重要な要件です。他の人がそれを上に見るように質問に追加する必要があります。なぜ、それらのグループが論理的に関連しているように聞こえるので、プロデューサー側であなたのメッセージをまとめるのではなく、異種のメッセージに分割するのではなく、原子単位として取り上げるべきですか?そうすれば、仲介業者の技術間で簡単に移動することも可能になります。私はあなたが必要とするものをあなたに提供するブローカーを知らないので、 –

+0

あなたは正しいです。私は上記の要件を指摘した。あなたの質問に関して、私はプロデューサー側でそれらをまとめることができればと思っていますが、グループ内の各メッセージは異なるマシン上にある異なるプロデューサーを介して来るかもしれません。 –

0

チャネル/消費者レベルのプリフェッチカウントを設定して、メッセージをバッチで消費することができます。メッセージを再送信するには、basic.reject AMQPメソッドを使用する必要があります。これらのメッセージは、キューに入れるか、デッドレターキューに転送するかを選択できます。同じキューからメッセージを取得しようとする複数のコンシューマは、AMQPのbasic.getメソッドが同時コンシューマを処理するために同期されるため、問題はありません。

https://groups.google.com/forum/#!topic/rabbitmq-users/hJ8f5du-GCA

+0

http://www.rabbitmq.com/consumer-prefetch.html –

関連する問題