私はRabbitMQのは、交流(トピック)ウッシングとそれらの間に接続された2つのノードのサービスを持って、私は(結果なし)交換機を使ってRabbitMQキューを再利用するには?
をやっているし、私が達成したいものを説明します:
何私は、something.orange.something
にメッセージを送っている間に、シャットダウンC1
が欲しいです。それから私はC1
を再開し、私が失ったすべてのメッセージを受け取りたいと思います。
私の現在の状況は、コンシューマを再起動するたびに新しいキューが作成され、同じルーティングキーを使用してExchangeに新しいバインディングが作成されるということです。だから今私は同じ情報を受け取る2つのキューを持っています。
キューを{exclusive: true}
に設定すると、問題の一部が解決され、受信者なしのキューはなくなりましたが、同じ問題が発生しました。アクティブな受信者なしで送信されたすべてのメッセージが失われます。
可能ですか?
ここに私のコード:
送信者: reveiver
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
let ex = 'direct_colors';
let args = process.argv.slice(2);
let colors = ['colors.en.green', 'colors.en.yellow', 'colors.es.red']
ch.assertExchange(ex, 'topic', {durable: true});
setInterval(() => {
let color = colors[Math.floor(Math.random() * 3)];
let msg = `This is a ${color} message`;
ch.publish(ex, color, new Buffer(msg));
logatim[color.split('.').pop()].info(msg);
}, 1000);
});
});
:
'use strict';
const amqp = require('amqplib/callback_api');
const logatim = require('logatim');
logatim.setLevel('info');
const args = process.argv.slice(2);
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
var ex = 'direct_colors';
ch.assertExchange(ex, 'topic', {durable: true});
ch.assertQueue('', {exclusive: true, durable: true}, (err, q) => {
logatim.green.info(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach((arg) => {
ch.bindQueue(q.queue, ex, arg);
});
ch.consume(q.queue, (msg) => {
logatim[msg.fields.routingKey.split('.').pop()].info(` [x] ${msg.content.toString()}`);
});
});
});
});