公開機能の外側の接続(amqpConn)および発行者チャネル(pubChannel)を定義し、使用
Channel.js
var amqp = require('amqplib/callback_api');
var url = process.env.AMQP_URL || 'amqp://guest:[email protected]:5672';
module.exports = createQueueChannel;
function createQueueChannel(queue, cb) {
console.log("connecting................");
amqp.connect(url, onceConnected);
function onceConnected(err, conn) {
if (err) {
console.error('Error connecting:', err.stack);
}
else {
console.log('connected');
conn.createChannel(onceChannelCreated);
}
function onceChannelCreated(err, channel) {
if (err) {
cb(err);
}
else {
channel.assertQueue(queue, {durable: true}, onceQueueCreated);
}
function onceQueueCreated(err) {
if (err) {
cb(err);
}
else {
cb(null, channel, conn);
}
}
}
}
}
ことメッセージを公開するときに使用します。オフラインキューが場合に使用されてもhttps://gist.github.com/carlhoerberg/006b01ac17a0a94859ba (https://www.cloudamqp.com/blog/2015-05-19-part2-2-rabbitmq-for-beginners_example-and-sample-code-node-js.html) 接続がダウンしてしばらくの間です:
は、私はあなたがここに完全なサンプルコードを見てみることをお勧めします。
また、接続すると、パブリッシャが起動します。
function whenConnected() {
startPublisher()
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
などの機能を公開:
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
}
);
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}