2016-11-27 14 views
0

私はspring-amqpが初めてです。 auto-ackを使用する代わりに、手動でメッセージを確認しようとしています。Spring-amqp - サーバを閉じるまで、キュー内の最後のメッセージは未確認のままです。

私は、最後のメッセージが管理コンソールで解凍されているのを見ています。

image for unacked message in managemnet console. キューは空です。

サーバーを停止すると、最後のメッセージが確認されます。どうすれば対応できますか?未確認のメッセージID /情報をログに記録するにはどうすればいいですか。

これは私が実装したコードです。

RabbitConfig.java:

パブリッククラスRabbitMQConfig {

final static String queueName = "spring-boot"; 

@Bean 
Queue queue() { 
    return new Queue(queueName, true,false,false,null); 
} 

@Bean 
TopicExchange exchange() { 
    return new TopicExchange("spring-boot-exchange"); 
} 

@Bean 
Binding binding(Queue queue, TopicExchange exchange) { 
    return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
             MessageListenerAdapter listenerAdapter) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames(queueName); 
    container.setMessageListener(listenerAdapter); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    return container; 
} 

@Bean 
Consumer receiver() { 
    return new Consumer(); 
} 

@Bean 
MessageListenerAdapter listenerAdapter(Consumer receiver) { 
    return new MessageListenerAdapter(receiver, "receiveMessage"); 
} 

Consumer.java

パブリッククラスの消費者はChannelAwareMessageListener {

@RabbitListener(queues = "spring-boot") 
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) 
     throws IOException, InterruptedException { 
    Thread.sleep(500); 
    channel.basicAck(tag, true); 
    System.out.println(tag + "received"); 
} 

@Override 
public void onMessage(Message arg0, Channel arg1) throws Exception { 
    // TODO Auto-generated method stub 

} 

プロデューサーエンドポイントを実装します

@RestController パブリッククラスHelloController {

private final RabbitTemplate rabbitTemplate; 

public HelloController(RabbitTemplate rabbitTemplate) { 
    this.rabbitTemplate = rabbitTemplate; 

} 

// Call this end point from the postman or the browser then check in the 
// rabbitmq server 
@GetMapping(path = "/hello") 
public String sayHello() throws InterruptedException { 
    // Producer operation 
    for (int i = 0; i < 100; i++) { 
     Thread.sleep(500); 
     rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World"); 
    } 
    return "hello"; 
} 

@GetMapping(path = "/hellotwo") 
public String sayHellotwo() throws InterruptedException { 
    // Producer operation 
    for (int i = 0; i < 50; i++) { 
     Thread.sleep(500); 
     rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message"); 

    } 
    return "hellotwo"; 
} 

答えて

0

次の2個のリスナーコンテナを持っています。 container beanと@RabbitListenerのフレームワークによって作成されたものです。

私は、テストを自分自身を実行せずに何が起こっているか全くわからないが、私はこの問題は、単純なMessageListenerAdapterからreceiveMessageを呼び出すためにあなたの試みであると思います。

このアダプタは、1つの引数(Messageから変換された)を持つメソッドを呼び出すように設計されています。また、そのアダプターは@Headerパラメーターをどのようにマップするのかわかりません。私は配達が失敗したと思うし、MANUAL acksを使用しているので、unack'd配達とデフォルトのqos(1)のために、そのコンテナへの配達はもう試行されません。

container beanは必要ありません。代わりに、Ackモードを設定するようにメッセージリスナーコンテナファクトリを設定します。 the documentationを参照してください。

spring-amqpを初めてお使いの方は、なぜあなたは手作りの悩みが必要だと思いますか?デフォルトモード(自動)は、コンテナがあなたのためにack/nackすることを意味します(NONEは伝統的なウサギの自動応答です)。 Springで手動のackを使用するのは一般的ではありません。

+0

ありがとうございました。解決策は私のために働いた。 私は何らかの理由で消費者が死亡し、メッセージが再キューされてもメッセージが失われないことを確認するために手動確認を使用しています。 – Diva04

+0

これは不要です。 - AUTO ACKモードは、rabbitmq auto-ackネイティブではありません(NONEです)。 AUTOを使用すると、コンテナはリスナーが正常に終了した場合にのみメッセージを確認します。例外がスローされた場合、メッセージをnackします。消費者が死亡した場合、ブローカは自動的にメッセージを再キューに入れます。 –

関連する問題