2013-06-12 4 views
9

初心者からRabbitMQ、Javaで初めてです。Spring ChannelAwareMessageListenerを使用しているときにRabbitMQ Consumer Cancellation Notificationを処理するにはどうすればいいですか?

私は、手動でacksを使用し、消費者の取り消し通知をjava Spring AMQP抽象化を使用して処理するリスナーを作成しようとしています。 Springの抽象化を使用して両方のタスクを達成できますか?

メッセージをキューから取り出し、そのメッセージを処理するリスナーを作成したいとします(データベースなどに書き込むかもしれません)。私は、メッセージの処理が失敗した場合、または何らかの理由で完了できない場合には、肯定応答を使用して拒否し再キューできるようにしました。これまでのところ、私は、手動でack/nack /拒否するには、Spring AMQPを使用してChannelAwareMessageListenerを使用する必要があることが分かったと思います。

私はRabbitMQからConsumer Cancellation Notificationsを処理する必要があることを認識していますが、ChannelAwareMessageListenerを使用すると、これをコード化する方法は実際にはありません。私がCCNを処理するために見る唯一の方法は、より低いレベルのJavaクライアントAPIを使用してchannel.basicConsume()を呼び出し、メッセージの配信とキャンセルを処理する新しいDefaultConsumerインスタンスを渡すことです。

ConnectionFactoryclientPropertiesを設定する方法もわかりません(ブローカーに私がCCNを処理できることを伝えるため)。なぜなら設定でBeanからファクトリを取得しているからです。

リスナーの擬似コードとコンテナの作成は以下のとおりです。あなたは(これのConnectionFactoryを仮定すると、RabbitMQのJavaライブラリからのオブジェクトである)ConnectionFactorysetClientPropertiesメソッドを使用する必要があり、クライアントのプロパティを設定するための

public class MyChannelAwareListener implements ChannelAwareMessageListener 
{ 
    @Override 
    public void onMessage(Message message, Channel channel) throws Exception 
    { 
     msgProcessed = processMessage(message); 

     if(msgProcessed)  
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
     else 
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); 
    } 
} 

public static void main(String[] args) throws Exception 
{ 
    ConnectionFactory rabbitConnectionFactory; 
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH); 
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory"); 

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 

    MyChannelAwareListener listener = new MyChannelAwareListener(); 
    container.setMessageListener(listener); 
    container.setQueueNames("myQueue"); 
    container.setConnectionFactory(rabbitConnectionFactory); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    container.start(); 
} 

答えて

1

。この方法では、クライアントのプロパティと機能を含むMap<String, Object>が必要です。次の行は、RabbitMQのJavaライブラリ内のデフォルト値は次のとおりです。

Map<String,Object> props = new HashMap<String, Object>(); 
props.put("product", LongStringHelper.asLongString("RabbitMQ")); 
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION)); 
props.put("platform", LongStringHelper.asLongString("Java")); 
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT)); 
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE)); 

Map<String, Object> capabilities = new HashMap<String, Object>(); 
capabilities.put("publisher_confirms", true); 
capabilities.put("exchange_exchange_bindings", true); 
capabilities.put("basic.nack", true); 
capabilities.put("consumer_cancel_notify", true); 

props.put("capabilities", capabilities); 

私は春のAMQP抽象化とそれを行う方法を確認していないキャンセルACKおよび消費者を管理するための、しかし、あなたに与えるchannel.basicConsumeと完全になんとかですすべてのコールバックメソッドを介してすべてのシナリオを処理するための可能性:

http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

は、この情報がお役に立てば幸い!