「ヒューストン私たちはここに問題があります。」イベントを処理する最初の試行で失敗した後、 。 このシナリオでは、デッドレター交換を実装しました。Spring AMQP - TTLでデッドレターメカニズムを使用したメッセージ再キューイング
メッセージが失敗し、DLX - >再試行キューにルーティングされ、別の試行のために5分のTTL後に作業キューに戻ります。ここで
は、私が使用していた構成である。
public class RabbitMQConfig {
@Bean(name = "work")
@Primary
Queue workQueue() {
return new Queue(WORK_QUEUE, true, false, false, null);
}
@Bean(name = "workExchange")
@Primary
TopicExchange workExchange() {
return new TopicExchange(WORK_EXCHANGE, true, false);
}
@Bean
Binding workBinding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(workQueue()).to(workExchange()).with("#");
}
@Bean(name = "retryExchange")
FanoutExchange retryExchange() {
return new FanoutExchange(RETRY_EXCHANGE, true, false);
}
@Bean(name = "retry")
Queue retryQueue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", WORK_EXCHANGE);
args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min
return new Queue(RETRY_QUEUE, true, false, false, args);
}
@Bean
Binding retryBinding(Queue queue,FanoutExchange exchange) {
return BindingBuilder.bind(retryQueue()).to(retryExchange());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
Consumer receiver() {
return new Consumer();
}
@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
Producer.java:
@GetMapping(path = "/hello")
public String sayHello() {
// Producer operation
String messages[];
messages = new String[] {" hello "};
for (int i = 0; i < 5; i++) {
String message = util.getMessage(messages)+i;
rabbitTemplate.convertAndSend("WorkExchange","", message);
System.out.println(" Sent '" + message + "'");
}
return "hello";
}
Consumer.java:
public class Consumer {
@RabbitListener(queues = "WorkQueue")
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException {
try {
System.out.println("message to be processed: " + message);
doWorkTwo(message);
channel.basicAck(tag, false);
} catch (Exception e) {
System.out.println("In the exception catch block");
System.out.println("message in dead letter exchange: " + message);
channel.basicPublish("RetryExchange", "", null, message.getBytes());
}
}
private void doWorkTwo(String task) throws InterruptedException {
int c = 0;
int b = 5;
int d = b/c;
}
}
が、それは死者を使用する正しい方法です。私のシナリオでの文字の交換とは、RETRY QUEUEで5分間待ってから、 2回目は、RETRY QUEUEで5分間待たずに(私はTTLを5分と言いました)、すぐにWORK QUEUEに移動します。
私はlocalhost:8080/hello URLを入力してこのアプリケーションを実行しています。
これは私のアップデートされた設定です。
RabbitMQConfig.java:
@EnableRabbit
public class RabbitMQConfig {
final static String WORK_QUEUE = "WorkQueue";
final static String RETRY_QUEUE = "RetryQueue";
final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange
final static String RETRY_EXCHANGE = "RetryExchange";
final static int RETRY_DELAY = 60000; // in ms (1 min)
@Bean(name = "work")
@Primary
Queue workQueue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", RETRY_EXCHANGE);
return new Queue(WORK_QUEUE, true, false, false, args);
}
@Bean(name = "workExchange")
@Primary
DirectExchange workExchange() {
return new DirectExchange(WORK_EXCHANGE, true, false);
}
@Bean
Binding workBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(workQueue()).to(workExchange()).with("");
}
@Bean(name = "retryExchange")
DirectExchange retryExchange() {
return new DirectExchange(RETRY_EXCHANGE, true, false);
}
// Messages will drop off RetryQueue into WorkExchange for re-processing
// All messages in queue will expire at same rate
@Bean(name = "retry")
Queue retryQueue() {
Map<String, Object> args = new HashMap<String, Object>();
//args.put("x-dead-letter-exchange", WORK_EXCHANGE);
//args.put("x-message-ttl", RETRY_DELAY);
return new Queue(RETRY_QUEUE, true, false, false, null);
}
@Bean
Binding retryBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(retryQueue()).to(retryExchange()).with("");
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(false);
/*factory.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer())
.backOffOptions(1000, 2, 5000)
.build()
});*/
return factory;
}
@Bean
Consumer receiver() {
return new Consumer();
}
@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
Consumer.java:DLQにブローカールートそれは、あなたがx-death
ヘッダを調べることができますので、あなたがメッセージを拒否した場合
public class Consumer {
@RabbitListener(queues = "WorkQueue")
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long tag,
@Header(required = false, name = "x-death") HashMap<String, String> xDeath)
throws IOException, InterruptedException {
doWorkTwo(message);
channel.basicAck(tag, false);
}
private void doWorkTwo(String task) {
int c = 0;
int b = 5;
if (c < b) {
throw new AmqpRejectAndDontRequeueException(task);
}
}
}
どんな意味がありません。何が示唆されていますあなたが再試行待ち行列に自分自身でパブリッシュしているので、ブローカはこれが2回目の再試行であるということを知らず、それは単に新しいメッセージであり、別のアクションをとることはありません。 –
ありがとう@GaryRussell。ブローカが2回目の再試行などを確認できるシナリオをどのように達成できますか?私はいくつかのことを試みましたが、常に1としてカウントを取得します。 retryQueueへのパブリッシュは自動的にどうなりますか? – Diva04
直接拒否して再キューした場合、ブローカには再配信されたという兆候しかありません。あなたがDLQを拒否すれば、ブローカーは 'x-death'ヘッダーでカウントを保持します - 私の答えを見てください。自分でDLQに公開する場合は、自分のヘッダーでカウントを保持する必要があります。 –