2011-03-07 10 views
2

次のシナリオがあります。Qpid - 異なるパフォーマンス特性を持つコンシューマへの負荷分散メッセージ

2人の消費者が共有するキューに100個のメッセージを送ります。両方のサブスクライバは、事前取得モードと明示モードでキューにサブスクライブします。各メッセージが処理された後、各サブスクライバはメッセージを受け入れてキューから削除します。コードが見える擬似ことを好き:メッセージが正しく負荷分散され

OnMessageTransfer(message) : 
    DoSomethingWithMessage(message) 
    Session.MessageAccept(message) 

を、各メッセージは一度だけ処理されたが、我々はそれを考慮に入れ、各消費者のための処理時間を取らないことを発見しています。たとえば、消費者Aがメッセージを処理するのに50ミリ秒かかり、消費者Bが5秒かかると仮定します。理想的には、消費者Bは1メッセージの処理を開始し、その間に消費者Aは99人を処理する必要があります。しかし、消費者Aは50秒で25のメッセージを実際に処理し、消費者Aは約4秒で75の他のメッセージを処理し、アイドルになります。クライアントのAPIはメッセージをプリフェッチしているようですが、この状況では明らかに最適ではありません。

この問題をどうやって解決できますか?

我々はQpidのCPP 0.5を使用していて、完全ではなく、CPPバインディングを#、C 0-10クライアントAPIを管理する(しかし、私の理解は、この動作は、APIの実装にリンクされていないということです)

よろしく、

Julien

答えて

2

この動作は、キューのメッセージフローを構成することによって変更できます。メッセージのプリフェッチを避けるには、1メッセージのクレジットに設定し、各メッセージの処理後に更新する必要があります。擬似コードは次のようになります。

InitSubscription(queue) : 
    MessageSubscribe(queue, AcceptMode.Explicit, AcquireMode.PreAcquired) 
    MessageSetFlowMode(queue, FlowMode.Credit) 
    MessageFlow(queue, CreditUnit.Byte, MAX_BYTES) 
    MessageFlow(queue, CreditUit.Message, 1) // will disable prefetch 

OnMessageTransfer(message) : 
    DoSomethingWithMessage(message) 
    MessageAccept(message) 
    MessageFlow(queue, CreditUit.Message, 1) // reissue a credit for 1 and only 1 message 
+0

グレート自己回答! –

関連する問題