2017-07-03 18 views
1
channel.basicQos(1); 

while (true) { 
    GetResponse res = channel.basicGet(TEST_QUEUE, false); 
    if (res != null) { 
     deliveryTag = res.getEnvelope().getDeliveryTag(); 
    } 

    // Handle all messages If the condition is true 
    if (condition) { 
     // nack all messages unhandled previously 
     channel.basicNack(deliveryTag - 1, true, true); 

     // ack current message only 
     channel.basicAck(deliveryTag, false); 
    } 
    else { 
     // Do not handle current message and continue to get next one 
    }   
} 

Q1。
私は、nackとackの両方を同時に使用できるかどうかはわかりません。
deliveryTag - 1を使用して以前のすべてのメッセージを指定できますか?現在のメッセージの前にすべてのメッセージとAckの現在のメッセージをナックします(rabbitmq、java)

つまり、if条件を満たさないメッセージはすべてスキップします。
現在のメッセージが条件を満たす場合は、すべてのスキップされたメッセージをnackし、現在のメッセージを確認します。
これを行うことで、特定のメッセージの処理を遅らせたいと考えています。

Q2。
私はwhile(true)と書くと、複数のワーカーが動作している場合、channel.basicQos(1)は期待どおりに動作しません。
カウントを制限するためにこのようなコードを書くべきでしょうか?または他のすべてのワーカーがメッセージを均等に取得できるようにするにはどうすればよいですか?

int prefetch = 1; 
int count = 0; 
while (count++ <= prefetch) { 
} 

Q3。
私は気づいたワーカープログラムは、接続が開いている限り、終了しません。
接続はどれくらい長くなりますか。手動で閉じる必要がありますか?

最後に、1は、この場合(MessageListenerの(ChannelAwareMessageListener)モデルを使用していない)で、より適しているRabbitTemplate対AmqpTemplate対 のRabbitMQ JavaクライアントAPI?

答えて

0

Q1 - 正常に動作するはずです。それを試して問題を見つけましたか?はい、タグは各配送に対して増分されます。

Q2 - basicQosは、basicGet()とは関係がありません。basicConsume()でのみ使用されています。

Q3 - 完了すると接続を閉じる必要があります。

最後に、場合によります。 Springのより高いレベルのサポート(メッセージ変換など)が必要な場合は、それを使用してください。生データAPIを扱う場合は、Springを使用しないでください。

RabbitTemplateは、executeメソッド(チャネルコールバックあり)を除き、ユーザー管理のacks/nacksを使用してbasicGetを直接サポートしていません。

関連する問題