2016-10-17 18 views
1

私はRabbitMQキューからレコードを手動で取得する必要があります。以下は私がこれを行うために作成したものです。しかし、これを行うには、より良いSpring/RabbitMQの方法があるのだろうか?RabbitMQ Springからメッセージを手動で取り出す方法

channel.basicAckとchannel.basicNackが使用されています。コンシューマが例外をスローした場合に、レコードがキューに入れられるように、これらのレコードを認識可能/ NotAcknowledgeに配置しました。

私はRabbitTemplate.receiveAndConvertを使用していましたが、これは承認されない理由があっても自動的にレコードを認識します。

import java.util.Properties; 
import java.util.function.Consumer; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.core.AmqpAdmin; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.MessageProperties; 
import org.springframework.amqp.rabbit.core.ChannelCallback; 
import org.springframework.amqp.rabbit.core.RabbitAdmin; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; 
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.DefaultConsumer; 
import com.rabbitmq.client.GetResponse; 

@Service 
public class RabbitAdminServices { 

    private static final Logger logger = LoggerFactory.getLogger(RabbitAdminServices.class); 

    @Autowired 
    AmqpAdmin rabbitAdmin; 

    @Autowired 
    RabbitTemplate rabbitTemplate; 

    @Autowired 
    MessageConverter messageConverter; 

    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); 

    public int getCount(String queueName) { 

     Properties properties = rabbitAdmin.getQueueProperties(queueName); 
     return (Integer)properties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT); 
    } 

    public <T> void processQueue(String queueName, Integer count, Class<T> clazz, Consumer<T> consumer) { 

     int reprocessCount = getCount(queueName); 
     int requestCount = reprocessCount; 
     if(count != null) { 
      requestCount = count; 
     } 
     for(int i = 0; i < reprocessCount && i < requestCount; i++) { 
      rabbitTemplate.execute(new ChannelCallback<T>() { 

       @Override 
       public T doInRabbit(Channel channel) throws Exception { 
        GetResponse response = channel.basicGet(queueName, false); 
        T result = null; 
        try { 
         MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8"); 
         if(response.getMessageCount() >= 0) { 
          messageProps.setMessageCount(response.getMessageCount()); 
         } 
         Message message = new Message(response.getBody(), messageProps); 
         result = (T)messageConverter.fromMessage(message); 
         consumer.accept(result); 
         channel.basicAck(response.getEnvelope().getDeliveryTag(), false); 
        } 
        catch(Exception e) { 
         channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true); 
        } 
        return result; 
       } 
      }); 

     } 
    } 
} 

答えて

0

解決策は現在のテンプレートの状態で正しいです。我々は、おそらく

<T> void template.receiveAndConvert(..., Class<T> clazz, Consumer<T> consumer); 

のように、RabbitTemplateに何かをオーバーロードされたメソッドを追加することを検討してテンプレートACK/NACK消費者が例外をスローするかどうかに基づいている必要があります。そうすれば、コード内で変換作業を行う必要がなくなります。

+0

ありがとうございます。私は当分の間、正しい軌道に乗っていることを知ってうれしく思います。そして、将来のいくつかの反復で上記のようなオーバーロードされたメソッドを使用できるようになることを楽しみにしています。 – user3768622

関連する問題