2016-04-01 13 views
1

リトライ機能(コンシューマに問題がある場合)を3回実行してから、メッセージを別のキューに移動する必要があります(デッドレター交換)。私は以下のようにキュー/交換を設定しているRabbitMQのアドバイスチェーンが動作しない

定期的なメッセージ交換名:test_dlq_exchange デッドメッセージキュー:TEST_QUEUEという TEST_QUEUEというのバインドが

デッド・レター交換名をキーTEST_QUEUEというのルーティングとtest_exchangeする: メッセージ・キューがtest_exchange test_dlq_queue RabbitMQのUIコンソールでキーtest_dlq_queue

をルーティングとtest_dlq_exchangeするtest_dlq_queueバインド、私は "test_exchange"

として "X-デッドレター交換" を設定しています以下

は私のカスタムメッセージリスナーはちょうど私のMessageObjectを持っているメッセージドリブンPOJOであるSimpleMessageListenerContainer

@Bean 
    SimpleMessageListenerContainer getMessageListenerContainer(){ 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("test_queue"); 

    MessageListenerAdapter adapter = new MessageListenerAdapter(); 
    //configured my message listener class 
    //configured Jackson2JsonMessageConverter as converter 
    container.setMessageListener(adapter); 
    container.setAdviceChain(new Advice[] {retryAdvice()} 
    return container; 
    } 

    //configuration for retryAdvice 

    @Bean 
    public MethodInterceptor retryAdvice{ 

     ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy(); 
     backoffPolicy.setInitialInterval(10); 
     backoffPolicy.setMaxInterval(1000); 
     backoffPolicy.setMultiplier(2); 
     RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory()); 
     retryTemplate.setQueue("test_dl_queue"); 
     return RetryInterceptorBuilder 
        .stateful() 
        .backOffPolicy(backoffPolicy) 
        .maxAttempts(3) 
        .recoverer(
         new RepublishMessageRecoverer 
         (retryTemplate,"test_dl_exchange","test_queue")).build(); 
    } 

ためのコードです。

私はステートフルを使用しているので、createMessageIds(true)を有効にしました。私のメッセージリスナーでは、私はtargetobjectのメソッドを再度呼び出しています。コンテナの起動後、フローは循環的に進行します

つまり、メッセージをキューにパブリッシュします。 - >メッセージリスナーは、ある例外に基づいてターゲットメソッドを呼び出します。>再びメッセージをキューにパブリッシュします。>メッセージリスナーの呼び出しのターゲットメソッド等デッドレター交換/キューにメッセージをプッシュして無限ループに入りません。

は私が

 o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee) 

次のように参照ログではいくつかのいずれかがこの問題を解決するために私を助けることができますか?

答えて

0

RepublishMessageRecovererを使用しているため、ブローカでDLX/DLQを設定すると何も起こりません。 DLQにルーティングするには、RejectAndDontRequeueRecoverが必要です。リトライが完了すると、メッセージはDLX/DLQに送信されます。

あなたが同じキューしたがって

(retryTemplate,"test_dl_exchange","test_queue")) 

に無限ループを再発行しているように見えます。

+0

ありがとうございました。 RejectAndDontRequeueRecovererをインクルードした後で正常に動作します。最大再試行に達した後、メッセージをDLQに移動します。 – Raja

関連する問題