私はSpringブートとActiveMQを使用しています。私はトピックからメッセージを送受信したい。これはうまくいきます。私のコードは次のようになります。SpringブートのトピックのDLQ
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {
JmsSpike.TestListener1.class,
JmsSpike.TestListener2.class,
JmsSpike.Config.class
})
@TestPropertySource(properties = {
"spring.activemq.broker-url: tcp://localhost:61616",
"spring.activemq.password: admin",
"spring.activemq.user: admin",
"spring.jms.pub-sub-domain: true", // queue vs. topic
})
@EnableJms
@EnableAutoConfiguration
public class JmsSpike {
@Autowired
private JmsTemplate jmsTemplate;
@Test
public void sendMessage() throws Exception {
sendMessageInThread();
Thread.sleep(10000);
}
private void sendMessageInThread() {
new Thread() {
public void run() {
jmsTemplate.convertAndSend("asx2ras", "I'm a test");
}
}.start();
}
@TestComponent
protected static class TestListener1 {
@JmsListener(destination = "asx2ras")
public void receiveMessage(String message) {
System.out.println("****************** 1 *******************");
System.out.println("Hey 1! I got a message: " + message);
System.out.println("****************** 1 *******************");
}
}
@TestComponent
protected static class TestListener2 {
@JmsListener(destination = "asx2ras")
public void receiveMessage(String message) {
throw new RuntimeException("Nope");
}
}
@Configuration
protected static class Config {
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setMaximumRedeliveries(1);
return topicPolicy;
}
@Bean
public ConnectionFactory connectionFactory(@Value("${spring.activemq.user}") final String username,
@Value("${spring.activemq.password}") final String password,
@Value("${spring.activemq.broker-url}") final String brokerUrl) {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(username, password, brokerUrl);
cf.setRedeliveryPolicy(redeliveryPolicy());
return cf;
}
}
}
- 私は両方のリスナーに
- 一つリスナーは常に動作し、ほんの一部のコンソールメッセージ
- に他のリスナーを出力しますメッセージを受け取るメッセージ
- を送ることができます
私はリトライを "1"に設定していますので、失敗したリスナーはexcの後にもう一度呼び出されますeptionが投げられた。ただし、再試行後、メッセージはエラーキュー(またはエラートピック)に配信されません。エラーが発生したリスナーを後でもう一度呼び出せるように、メッセージをエラーキューに送信するにはどうすればよいですか?
このトピックのすべてのリスナーではなく、失敗したリスナーだけをもう一度呼び出す必要があることに注意してください。それは可能ですか?
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
ここに、activemq.xmlのブローカ構成を投稿できますか?永続性を使用していますか?公式の文書から: "デフォルトでは、ActiveMQは配信不能な非永続メッセージをデッドレターキューに配置しません。 –
@IulianRoscaもちろん。私は私の質問を更新しました。 –
更新いただきありがとうございます。この時点で私がやろうとしているのは、activeMq.xmlの中でmaximumRedeliveriesを1に設定することです。デフォルトでは、値は6(http://activemq.apache.org/redelivery-policy.html)に設定されています。この構成の後でメッセージがDLQで終了する場合、クライアントがブローカー再配信ポリシーを無効にするときにメカニズムに問題があることを意味します。デバッグロギングを有効にすると、折衝された値(WireFormat)が表示されます。 –