2017-03-23 6 views
0

私はSpringブートとActiveMQを使用しています。私はトピックからメッセージを送受信したい。これはうまくいきます。私のコードは次のようになります。SpringブートのトピックのDLQ

@RunWith(SpringRunner.class) 
@SpringBootTest(classes = { 
     JmsSpike.TestListener1.class, 
     JmsSpike.TestListener2.class, 
     JmsSpike.Config.class 
}) 
@TestPropertySource(properties = { 
     "spring.activemq.broker-url: tcp://localhost:61616", 
     "spring.activemq.password: admin", 
     "spring.activemq.user: admin", 
     "spring.jms.pub-sub-domain: true", // queue vs. topic 
}) 
@EnableJms 
@EnableAutoConfiguration 
public class JmsSpike { 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    @Test 
    public void sendMessage() throws Exception { 
     sendMessageInThread(); 
     Thread.sleep(10000); 
    } 

    private void sendMessageInThread() { 
     new Thread() { 
      public void run() { 
       jmsTemplate.convertAndSend("asx2ras", "I'm a test"); 
      } 
     }.start(); 
    } 

    @TestComponent 
    protected static class TestListener1 { 

     @JmsListener(destination = "asx2ras") 
     public void receiveMessage(String message) { 
      System.out.println("****************** 1 *******************"); 
      System.out.println("Hey 1! I got a message: " + message); 
      System.out.println("****************** 1 *******************"); 
     } 
    } 

    @TestComponent 
    protected static class TestListener2 { 

     @JmsListener(destination = "asx2ras") 
     public void receiveMessage(String message) { 
      throw new RuntimeException("Nope"); 
     } 
    } 

    @Configuration 
    protected static class Config { 

     @Bean 
     public RedeliveryPolicy redeliveryPolicy() { 
      RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); 
      topicPolicy.setMaximumRedeliveries(1); 
      return topicPolicy; 
     } 

     @Bean 
     public ConnectionFactory connectionFactory(@Value("${spring.activemq.user}") final String username, 
                @Value("${spring.activemq.password}") final String password, 
                @Value("${spring.activemq.broker-url}") final String brokerUrl) { 

      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(username, password, brokerUrl); 
      cf.setRedeliveryPolicy(redeliveryPolicy()); 
      return cf; 
     } 
    } 
} 
  • 私は両方のリスナーに
  • 一つリスナーは常に動作し、ほんの一部のコンソールメッセージ
  • に他のリスナーを出力しますメッセージを受け取るメッセージ
  • を送ることができます

私はリトライを "1"に設定していますので、失敗したリスナーはexcの後にもう一度呼び出されますeptionが投げられた。ただし、再試行後、メッセージはエラーキュー(またはエラートピック)に配信されません。エラーが発生したリスナーを後でもう一度呼び出せるように、メッセージをエラーキューに送信するにはどうすればよいですか?

このトピックのすべてのリスナーではなく、失敗したリスナーだけをもう一度呼び出す必要があることに注意してください。それは可能ですか?

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> 

    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
      <policyEntry topic=">" > 
       <pendingMessageLimitStrategy> 
       <constantPendingMessageLimitStrategy limit="1000"/> 
       </pendingMessageLimitStrategy> 
      </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 

    <managementContext> 
     <managementContext createConnector="false"/> 
    </managementContext> 

    <persistenceAdapter> 
     <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

     <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage percentOfJvmHeap="70" /> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="100 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="50 gb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

    <transportConnectors> 
     <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    </transportConnectors> 

    <shutdownHooks> 
     <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
    </shutdownHooks> 

</broker> 
+0

ここに、activemq.xmlのブローカ構成を投稿できますか?永続性を使用していますか?公式の文書から: "デフォルトでは、ActiveMQは配信不能な非永続メッセージをデッドレターキューに配置しません。 –

+0

@IulianRoscaもちろん。私は私の質問を更新しました。 –

+0

更新いただきありがとうございます。この時点で私がやろうとしているのは、activeMq.xmlの中でmaximumRedeliveriesを1に設定することです。デフォルトでは、値は6(http://activemq.apache.org/redelivery-policy.html)に設定されています。この構成の後でメッセージがDLQで終了する場合、クライアントがブローカー再配信ポリシーを無効にするときにメカニズムに問題があることを意味します。デバッグロギングを有効にすると、折衝された値(WireFormat)が表示されます。 –

答えて

1

official documentationによると、あなたは、クライアント側のブローカーの設定を上書きすることができます:


EDIT

は、ここに私のactivemq.xml(ちょうどbrokerタグ)です

ブローカは、デフォルトの配信ポリシーを、彼のBrokerInfoコマンドパケットのクライアント接続。しかし、クライアントは ActiveMQConnection.getRedeliveryPolicy()メソッドを使用して、ポリシー設定を上書きすることができます

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

だから、あなたは再配信ポリシーを設定している方法はOKらしいです。

私が見るのは、RedeliveryPolicyの新しいインスタンスを作成し、単一のフィールドのみを設定する場合です。topicPolicy.setMaximumRedeliveries(1);他のすべてのフィールドにはデフォルト値が割り当てられます。 @JmsListenerを使用してCLIENT_ACKNOWLEDGEを使用していないことを確認し、また編集

RedeliveryPolicy policy = cf.getRedeliveryPolicy(); 
policy.setMaximumRedeliveries(1); 

:あなたはおそらく、再配信ポリシーの既存のインスタンスの最大再配信を設定する必要があります。このthreadによれば、CLIENT_ACKNOWLEDGEを使用すると、メッセージは再配信されません。

関連する問題