RabbitMQとSpring AMQPでパブリッシャの確認を使用し、リスナーがメッセージの処理中に例外をスローした場合、メッセージの確認コールバックがNACKを取得するようにします。Spring AMQPでエンドツーエンドのパブリッシャを確認する方法
this blog post後、私は赤でマークされたユースケースについて話しています:
主な質問です:私はConnectionFactoryの、RabbitTemplateとListenerContainerを設定しなければならないのか
手動のNACKを有効にするには?
例外の場合にメッセージを無効にして確認コールバックを
success = false
で呼び出すには、リスナーで何をしなければなりませんか?ここで
私の豆です:ここでは
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public ConfirmCallback confirmCallback() {
return new ConfirmCallbackTestImplementation();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setExchange(DIRECT_EXCHANGE);
return rabbitTemplate;
}
@Bean
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) {
Queue queue = queue(rabbitAdmin, exchange, "faultyListener");
FaultyMessageListener listener = new FaultyMessageListener();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(listener);
container.setQueues(queue);
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
return listener;
}
private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) {
Queue queue = new Queue(routingKey, true, false, true);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
return queue;
}
は私のリスナーの実装である:ここで
public class FaultyMessageListener implements ChannelAwareMessageListener {
private final List<Message> receivedMessages = new ArrayList<>();
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
receivedMessages.add(message);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
latch.countDown();
throw new AmqpException("Message could not be processed");
}
}
は私の確認コールバックです:
public static class ConfirmCallbackTestImplementation implements ConfirmCallback {
private volatile Map<String, Boolean> confirmations = new HashMap<>();
private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>();
@Override
public void confirm(CorrelationData correlationData, boolean success, String s) {
confirmations.put(correlationData.getId(), success);
expectationLatches.get(correlationData.getId()).countDown();
}
public CountDownLatch expect(String correlationId) {
CountDownLatch latch = new CountDownLatch(1);
this.expectationLatches.put(correlationId, latch);
return latch;
}
}
私は、以下を使用希望の行動を検証するためのテストまたは:
@Autowired
private RabbitTemplate template;
@Autowired
private FaultyMessageListener faultyListener;
@Autowired
private ConfirmCallbackTestImplementation testConfirmCallback;
@Test
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException {
String correlationId = "corr-data-test-2";
CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId);
template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId));
assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS));
confirmationLatch.await(1, TimeUnit.SECONDS);
assertThat(faultyListener.receivedMessages.size(), is(1));
assertThat(testConfirmCallback.confirmations.get(correlationId), is(false));
}
テストの結果は次のとおりです。最後のアサーションのため
java.lang.AssertionError:
Expected: is <false>
but: was <true>
。私の場合、これは確認コールバックがいつもsuccess = false
の代わりにsuccess = true
で呼び出され、私のリスナーのchannel.basicNack(...)
から何が期待されるかのように読まれます。