私はspring-amqpが初めてです。 auto-ackを使用する代わりに、手動でメッセージを確認しようとしています。Spring-amqp - サーバを閉じるまで、キュー内の最後のメッセージは未確認のままです。
私は、最後のメッセージが管理コンソールで解凍されているのを見ています。
image for unacked message in managemnet console. キューは空です。
サーバーを停止すると、最後のメッセージが確認されます。どうすれば対応できますか?未確認のメッセージID /情報をログに記録するにはどうすればいいですか。
これは私が実装したコードです。
RabbitConfig.java:
:パブリッククラスRabbitMQConfig {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, true,false,false,null);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean
Consumer receiver() {
return new Consumer();
}
@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
Consumer.java
パブリッククラスの消費者はChannelAwareMessageListener {
@RabbitListener(queues = "spring-boot")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException, InterruptedException {
Thread.sleep(500);
channel.basicAck(tag, true);
System.out.println(tag + "received");
}
@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
// TODO Auto-generated method stub
}
プロデューサーエンドポイントを実装します
@RestController パブリッククラスHelloController {
private final RabbitTemplate rabbitTemplate;
public HelloController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// Call this end point from the postman or the browser then check in the
// rabbitmq server
@GetMapping(path = "/hello")
public String sayHello() throws InterruptedException {
// Producer operation
for (int i = 0; i < 100; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World");
}
return "hello";
}
@GetMapping(path = "/hellotwo")
public String sayHellotwo() throws InterruptedException {
// Producer operation
for (int i = 0; i < 50; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message");
}
return "hellotwo";
}
ありがとうございました。解決策は私のために働いた。 私は何らかの理由で消費者が死亡し、メッセージが再キューされてもメッセージが失われないことを確認するために手動確認を使用しています。 – Diva04
これは不要です。 - AUTO ACKモードは、rabbitmq auto-ackネイティブではありません(NONEです)。 AUTOを使用すると、コンテナはリスナーが正常に終了した場合にのみメッセージを確認します。例外がスローされた場合、メッセージをnackします。消費者が死亡した場合、ブローカは自動的にメッセージを再キューに入れます。 –