2017-04-07 15 views
0

Spring AMQP/Rabbit MQの予定/遅延メッセージの方法を見つけ出すのに苦労しています。ここで解決策が見つかりました。しかし、私はまだプロンプトを持っています。Spring AMQP/Rabbit MQについては メッセージを受信して​​いません。Spring AMQP遅延メッセージ(rabbitMQ)

以下のようにマイ源:

@ConfigurationパブリッククラスAmqpConfig {

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
    connectionFactory.setAddresses("172.16.101.14:5672"); 
    connectionFactory.setUsername("admin"); 
    connectionFactory.setPassword("admin"); 
    connectionFactory.setPublisherConfirms(true); 
    return connectionFactory; 
} 


@Bean 
@Scope("prototype") 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    return template; 
} 


@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

@Bean 
public Queue queue() { 
    return new Queue("spring-boot-queue", true); 

} 

@Bean 
Binding binding(Queue queue, Exchange delayExchange) { 
    return BindingBuilder.bind(queue).to(delayExchange).with("spring-boot-queue").noargs(); 
} 

@Bean 
public SimpleMessageListenerContainer messageContainer() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); 
    container.setQueues(queue()); 
    container.setExposeListenerChannel(true); 
    container.setMaxConcurrentConsumers(1); 
    container.setConcurrentConsumers(1); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 

    container.setMessageListener(new ChannelAwareMessageListener() { 

     public void onMessage(Message message, Channel channel) throws Exception { 
      byte[] body = message.getBody(); 
      System.err.println("receive msg : " + new String(body)); 
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 

     } 
    }); 

    return container; 
} 

}

@Componentパブリッククラス送信がRabbitTemplate.ConfirmCallback {

private RabbitTemplate rabbitTemplate; 

@Autowired 
public Send(RabbitTemplate rabbitTemplate) { 
    this.rabbitTemplate = rabbitTemplate; 
    this.rabbitTemplate.setConfirmCallback(this); 
    rabbitTemplate.setMandatory(true); 
} 

public void sendMsg(String content) { 

    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); 

    rabbitTemplate.convertAndSend("my-exchange", "", content, new MessagePostProcessor() { 
     @Override 
     public Message postProcessMessage(Message message) throws AmqpException { 
      message.getMessageProperties().setHeader("x-delay", 6000); 
      return message; 
     } 
    },correlationId); 

    System.err.println("delay message send ................"); 

} 

/** 
* 回调 
*/ 
@Override 
public void confirm(CorrelationData correlationData, boolean ack, String cause) { 

    System.err.println(" callback id :" + correlationData); 

    if (ack) { 
     System.err.println("ok"); 
    } else { 
     System.err.println("fail:" + cause); 
    } 
} 
を実装

}

誰かが助けてくれる人がいますか?

ありがとうございます。

答えて

1

遅延メッセージは、Springのamqpとは関係ありません。これは、あなたのコードに存在するライブラリなので、ライブラリはメッセージを保持できません。

旧アプローチ:: は、各メッセージ/キュー(ポリシー)でTTL(生存時間)ヘッダーを設定し、それを処理するDLQを紹介し、あなたが試すことができる2つのアプローチがあります。 ttlが期限切れになると、メッセージはDLQからメインキューに移動し、リスナーが処理できます。

最新のアプローチ: 最近のRabbitMQは、あなたが同じとRabbitMQの-3.5.8以降で使用可能このプラグインのサポートを実現することができ使用して、をプラグインRabbitMQの遅延メッセージを思い付きました。

x-delayed-messageタイプの交換を宣言してから、メッセージの遅延時間をミリ秒単位で表すカスタムヘッダーx-delayを使用してメッセージをパブリッシュできます。メッセージは、x-delayミリ秒ここ

より後の各キューに配信されます:git私はあなたが言及したプラグインのメッセージを遅延RabbitMQの、おかげで、最高点を必要とするものである

+0

。 – squall

+0

あなたの交換機の減速を送信者と受信者の両方から見てください – lambodar

関連する問題