初心者からRabbitMQ、Javaで初めてです。Spring ChannelAwareMessageListenerを使用しているときにRabbitMQ Consumer Cancellation Notificationを処理するにはどうすればいいですか?
私は、手動でacksを使用し、消費者の取り消し通知をjava Spring AMQP抽象化を使用して処理するリスナーを作成しようとしています。 Springの抽象化を使用して両方のタスクを達成できますか?
メッセージをキューから取り出し、そのメッセージを処理するリスナーを作成したいとします(データベースなどに書き込むかもしれません)。私は、メッセージの処理が失敗した場合、または何らかの理由で完了できない場合には、肯定応答を使用して拒否し再キューできるようにしました。これまでのところ、私は、手動でack/nack /拒否するには、Spring AMQPを使用してChannelAwareMessageListener
を使用する必要があることが分かったと思います。
私はRabbitMQからConsumer Cancellation Notificationsを処理する必要があることを認識していますが、ChannelAwareMessageListener
を使用すると、これをコード化する方法は実際にはありません。私がCCNを処理するために見る唯一の方法は、より低いレベルのJavaクライアントAPIを使用してchannel.basicConsume()
を呼び出し、メッセージの配信とキャンセルを処理する新しいDefaultConsumer
インスタンスを渡すことです。
ConnectionFactory
にclientProperties
を設定する方法もわかりません(ブローカーに私がCCNを処理できることを伝えるため)。なぜなら設定でBeanからファクトリを取得しているからです。
リスナーの擬似コードとコンテナの作成は以下のとおりです。あなたは(これのConnectionFactoryを仮定すると、RabbitMQのJavaライブラリからのオブジェクトである)ConnectionFactory
でsetClientProperties
メソッドを使用する必要があり、クライアントのプロパティを設定するための
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}