2016-09-16 15 views
5

Spring AMQPの新機能です。私は消費者である他のアプリケーションにメッセージを送信するプロデューサであるアプリケーションを持っています。Spring AMQPでAckまたはNackを使用する方法

消費者がメッセージを受信すると、データの検証を行います。

データが適切であれば、ACKとメッセージをキューから削除する必要があります。 データが不適切な場合は、RabbitMQに再キューイングされるようにデータをNACK(Negative Acknowledge)する必要があります。

私は私がベースのメッセージを確認します

しかし、私の場合(例外が発生したときにそれがメッセージをキューに再登録されます)

**factory.setDefaultRequeueRejected(true);**

(これは、すべてのメッセージをキューに再登録しません)

**factory.setDefaultRequeueRejected(false);**に出くわしました検証時に次に、メッセージを削除する必要があります。 NACKがメッセージを再キューした場合。

私はAMQP仕様は、クライアントがそれらを破棄するか、それらを

方法をキューに再登録するか、ブローカーを指示する、メッセージを配信、個人を拒否することができますbasic.rejectメソッドを定義して

RabbitMQのウェブサイトで読んだことがあります上記のシナリオを達成するために?いくつかの例を教えてください。

Iは小さなプログラム

 logger.info("Job Queue Handler::::::::::" + new Date()); 
     try { 

     }catch(Exception e){ 

      logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::"); 

     } 

     factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{ 
      return cause instanceof XMLException; 
     })); 

メッセージは異なる例外の factory.setDefaultRequeueRejected(真)

09イング再ないしようとした:46:38854 ERROR [標準エラー](SimpleAsyncTaskExecutor -1) org.activiti.engine.ActivitiObjectNotFoundException:キー 'WF89012'でデプロイされたプロセスがありません

09:46:39102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1)エラー・キューから受信した:{ERROR = ないJPAトランザクションをコミットでした。ネストされた例外は javax.persistence.RollbackExceptionがある:rollbackOnly としてマークされたトランザクション}

答えて

4

the documentationを参照してください。

デフォルトでは、コンテナは、リスナが正常に終了した場合にメッセージを確認し(削除される)、リスナが例外をスローした場合には拒否(および再キューールド)します。

リスナー(またはエラーハンドラ)AmqpRejectAndDontRequeueExceptionをスローした場合、デフォルトの動作が上書きされ、メッセージは廃棄(又はそのように構成されている場合DLX/DLQにルーティングされる) - コンテナはbasicReject(false)代わりにbasicReject(true)を呼び出します。

したがって、検証が失敗した場合はAmqpRejectAndDontRequeueExceptionを投げてください。または、例外をAmqpRejectAndDontRequeueExceptionに変換するカスタムエラーハンドラでリスナを設定します。

これは、this answerに記載されています。

あなたが本当に、自分を肯定応答(ACK)の責任を取るMANUALに確認応答モードを設定し、@RabbitListenerを使用している場合ChannelAwareMessageListenerまたはthis techniqueを使用したい場合。

しかし、ほとんどの人は、コンテナに物事の世話をさせるだけです(彼らは何が起こっているのかを理解した後)。一般的に、マニュアルアクノレッジを使用するのは、アクセルを遅らせる、早期にアクセルを掛けるなどの特別な場合です。

EDIT

私は(今固定)することができ指摘答えに誤りがありました。 ListenerExecutionFailedExceptionの原因を調べる必要があります。私はこれをテストし、それが期待...

@SpringBootApplication 
public class So39530787Application { 

    private static final String QUEUE = "So39530787"; 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend(QUEUE, "foo"); 
     template.convertAndSend(QUEUE, "bar"); 
     template.convertAndSend(QUEUE, "baz"); 
     So39530787Application bean = context.getBean(So39530787Application.class); 
     bean.latch.await(10, TimeUnit.SECONDS); 
     System.out.println("Expect 1 foo:" + bean.fooCount); 
     System.out.println("Expect 3 bar:" + bean.barCount); 
     System.out.println("Expect 1 baz:" + bean.bazCount); 
     context.close(); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setErrorHandler(new ConditionalRejectingErrorHandler(
       t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException)); 
     return factory; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue(QUEUE, false, false, true); 
    } 
    private int fooCount; 

    private int barCount; 

    private int bazCount; 

    private final CountDownLatch latch = new CountDownLatch(5); 

    @RabbitListener(queues = QUEUE) 
    public void handle(String in) throws Exception { 
     System.out.println(in); 
     latch.countDown(); 
     if ("foo".equals(in) && ++this.fooCount < 3) { 
      throw new FooException(); 
     } 
     else if ("bar".equals(in) && ++this.barCount < 3) { 
      throw new BarException(); 
     } 
     else if ("baz".equals(in)) { 
      this.bazCount++; 
     } 
    } 

    @SuppressWarnings("serial") 
    public static class FooException extends Exception { } 

    @SuppressWarnings("serial") 
    public static class BarException extends Exception { } 

} 

結果として動作します。説明のため

Expect 1 foo:1 
Expect 3 bar:3 
Expect 1 baz:1 
+0

感謝。私はいくつかのプログラムを試しました。間違いを是正していただけますか? – Chandan

+0

エラーハンドラから常に 'AmqpRejectAndDontRequeueException'を投げるのは意味がありません。そのためには、 'default RequeueRejected = false'を設定するだけです。 [この回答](http://stackoverflow.com/questions/39509745/negative-acknowledgement-to-rabbitmq-queue-to-requeue-the-message-using-spring/39511357#39511357)のテクニックを使用して'XMLException'のためだけにそれをスローします - あなたのリスナーはそれを再スローする必要があります。 –

+0

私の答えに編集を参照してください。 –

関連する問題