2016-10-15 1 views
0

RabbitMQとSpring AMQPでパブリッシャの確認を使用し、リスナーがメッセージの処理中に例外をスローした場合、メッセージの確認コールバックがNACKを取得するようにします。Spring AMQPでエンドツーエンドのパブリッシャを確認する方法

this blog post後、私は赤でマークされたユースケースについて話しています:

enter image description here

主な質問です:私はConnectionFactoryの、RabbitTemplateとListenerContainerを設定しなければならないのか

  1. 手動のNACKを有効にするには?

  2. 例外の場合にメッセージを無効にして確認コールバックをsuccess = falseで呼び出すには、リスナーで何をしなければなりませんか?ここで

私の豆です:ここでは

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); 
    connectionFactory.setPublisherConfirms(true); 
    return connectionFactory; 
} 

@Bean 
public ConfirmCallback confirmCallback() { 
    return new ConfirmCallbackTestImplementation(); 
} 

@Bean 
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) { 
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
    rabbitTemplate.setConfirmCallback(confirmCallback); 
    rabbitTemplate.setExchange(DIRECT_EXCHANGE); 
    return rabbitTemplate; 
} 

@Bean 
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) { 
    Queue queue = queue(rabbitAdmin, exchange, "faultyListener"); 
    FaultyMessageListener listener = new FaultyMessageListener(); 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
    container.setMessageListener(listener); 
    container.setQueues(queue); 
    container.setDefaultRequeueRejected(false); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    container.start(); 
    return listener; 
} 

private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) { 
    Queue queue = new Queue(routingKey, true, false, true); 
    rabbitAdmin.declareQueue(queue); 
    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey)); 
    return queue; 
} 

は私のリスナーの実装である:ここで

public class FaultyMessageListener implements ChannelAwareMessageListener { 

    private final List<Message> receivedMessages = new ArrayList<>(); 

    private final CountDownLatch latch = new CountDownLatch(1); 

    @Override 
    public void onMessage(Message message, Channel channel) throws Exception { 
     receivedMessages.add(message); 
     channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 
     latch.countDown(); 
     throw new AmqpException("Message could not be processed"); 
    } 

} 

は私の確認コールバックです:

public static class ConfirmCallbackTestImplementation implements ConfirmCallback { 

    private volatile Map<String, Boolean> confirmations = new HashMap<>(); 
    private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>(); 

    @Override 
    public void confirm(CorrelationData correlationData, boolean success, String s) { 
     confirmations.put(correlationData.getId(), success); 
     expectationLatches.get(correlationData.getId()).countDown(); 
    } 

    public CountDownLatch expect(String correlationId) { 
     CountDownLatch latch = new CountDownLatch(1); 
     this.expectationLatches.put(correlationId, latch); 
     return latch; 
    } 

} 

私は、以下を使用希望の行動を検証するためのテストまたは:

@Autowired 
private RabbitTemplate template; 

@Autowired 
private FaultyMessageListener faultyListener; 

@Autowired 
private ConfirmCallbackTestImplementation testConfirmCallback; 

@Test 
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException { 
    String correlationId = "corr-data-test-2"; 
    CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId); 

    template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId)); 

    assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS)); 
    confirmationLatch.await(1, TimeUnit.SECONDS); 

    assertThat(faultyListener.receivedMessages.size(), is(1)); 
    assertThat(testConfirmCallback.confirmations.get(correlationId), is(false)); 
} 

テストの結果は次のとおりです。最後のアサーションのため

java.lang.AssertionError: 
    Expected: is <false> 
     but: was <true> 

。私の場合、これは確認コールバックがいつもsuccess = falseの代わりにsuccess = trueで呼び出され、私のリスナーのchannel.basicNack(...)から何が期待されるかのように読まれます。

答えて

2

このようには機能しません。パブリッシャー側のack/nackは、ブローカーがメッセージを受け入れたかどうかだけです。実際、ナックはブローカー自体の問題を意味するのでほとんど返されません - the rabbit documentationを参照してください。

basic.nackは、キューを担当するErlangプロセスで内部エラーが発生した場合にのみ配信されます。

は、同様に、消費者側のACK/NACKは、消費者がメッセージの責任を受け入れたかどうかについては、純粋で、NACKメッセージは、再キューイング、廃棄、またはデッドレターキューにルーティングすることができます。

メッセージが公開されると、コンシューマからのパブリッシャへの通信はありません。このような通信が必要な場合は、応答キューを設定する必要があります。

パブリッシャーとコンシューマーの間の密結合を望む場合は、代わりにSpring Remoting (RPC) Over RabbitMQを使用できます。コンシューマが例外をスローすると、パブリッシャに伝播されますが、そのメカニズムはJava Serializableオブジェクトのみをサポートします。

ドキュメントではXMLを参照していますが、プロキシとサービスの呼び出し元を@Bean s

関連する問題