クライアントがRabbitMQキューからメッセージを消費するsocket.ioサーバーを実行しているnode.jsスクリプトがあります。最近、Amazon AWSに移行しました.RabbitMQは2台のマシン(冗長インスタンス)のクラスタになりました。 AMQP接続は随時失われます(冗長VMを備えた高可用性環境から到着する制限であり、対処が必要です)。再接続を試みると、DNSはどのインスタンスを接続するかを選択しますデータレプリケーションを持つクラスタなので、どのインスタンスを接続するかは関係ありません)。amqp.nodeは接続のドロップを検出しません
問題は、再接続を試みたことがないことです。しばらくすると、接続が失われたとき、amqp.nodeは明らかに接続が失われたことに気づくことができません。また、消費者はメッセージの受信を停止し、socket.ioサーバーは単に新しい接続の受け入れを停止します。
RabbitMQ URLで設定された55秒のハートビートタイムアウト(socket.ioハートビートタイムアウトと混同しないでください)があり、amqp.nodeのコールバックAPIを使用して 'エラー'と '終了'イベントを確認していますが、明らかに発行されたことはありません。待ち行列は、消費されたメッセージがackされることを期待する。ノードスクリプトが接続の喪失を検出して終了するので、環境は自動的に新しいプロセスを開始し、接続を再確立します。
ここにコードがありますが、おそらくamqp.nodeコールバックAPIなどで何か問題があります。
var express = require('express');
app = express();
var http = require('http');
var serverio = http.createServer(app);
var io = require('socket.io').listen(serverio, { log: false });
var socket;
var allcli = [];
var red, blue, green, magenta, reset;
red = '\033[31m';
blue = '\033[34m';
green = '\033[32m';
magenta = '\033[35m';
orange = '\033[43m';
reset = '\033[0m';
var queue = 'ha.atualizacao_mobile';
var urlRabbit = 'amqp://login:[email protected]?heartbeat=55' // Amazon
var amqp = require('amqplib/callback_api');
var debug = true;
console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds.");
io.set('heartbeat interval', 10 * 60);
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients.");
console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds.");
io.set('heartbeat timeout', 11 * 60);
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds.");
io.sockets.on('connection', function(socket){
socket.on('error', function (exc) {
console.log(orange+"Ignoring exception: " + exc + reset);
});
socket.on('send-indice', function (data) {
// Some business logic
});
socket.on('disconnect', function() {
// Some business logic
});
});
function updatecli(data){
// Some business logic
}
amqp.connect(urlRabbit, null, function(err, conn) {
if (err !== null) {
return console.log("Error creating connection: " + err);
}
conn.on('error', function(err) {
console.log("Generated event 'error': " + err);
});
conn.on('close', function() {
console.log("Connection closed.");
process.exit();
});
processRabbitConnection(conn, function() {
conn.close();
});
});
function processRabbitConnection(conn, finalize) {
conn.createChannel(function(err, channel) {
if (err != null) {
console.log("Error creating channel: " + err);
return finalize();
}
channel.assertQueue(queue, null, function(err, ok) {
if (err !== null) {
console.log("Error asserting queue " + queue + ": " + err);
return finalize();
}
channel.consume(queue, function (msg) {
if (msg !== null) {
try {
var dataObj = JSON.parse(msg.content);
if (debug == true) {
//console.log(dataObj);
}
updatecli(dataObj);
} catch(err) {
console.log("Error in JSON: " + err);
}
channel.ack(msg);
}
}, null, function(err, ok) {
if (err !== null) {
console.log("Error consuming message: " + err);
return finalize();
}
});
});
});
}
serverio.listen(9128, function() {
console.log('Server: Socket IO Online - Port: 9128 - ' + new Date());
});