2015-01-05 8 views
6

クライアントがRabbitMQキューからメッセージを消費するsocket.ioサーバーを実行しているnode.jsスクリプトがあります。最近、Amazon AWSに移行しました.RabbitMQは2台のマシン(冗長インスタンス)のクラスタになりました。 AMQP接続は随時失われます(冗長VMを備えた高可用性環境から到着する制限であり、対処が必要です)。再接続を試みると、DNSはどのインスタンスを接続するかを選択しますデータレプリケーションを持つクラスタなので、どのインスタンスを接続するかは関係ありません)。amqp.nodeは接続のドロップを検出しません

問題は、再接続を試みたことがないことです。しばらくすると、接続が失われたとき、amqp.nodeは明らかに接続が失われたことに気づくことができません。また、消費者はメッセージの受信を停止し、socket.ioサーバーは単に新しい接続の受け入れを停止します。

RabbitMQ URLで設定された55秒のハートビートタイムアウト(socket.ioハートビートタイムアウトと混同しないでください)があり、amqp.nodeのコールバックAPIを使用して 'エラー'と '終了'イベントを確認していますが、明らかに発行されたことはありません。待ち行列は、消費されたメッセージがackされることを期待する。ノードスクリプトが接続の喪失を検出して終了するので、環境は自動的に新しいプロセスを開始し、接続を再確立します。

ここにコードがありますが、おそらくamqp.nodeコールバックAPIなどで何か問題があります。

var express = require('express'); 
app = express(); 
var http = require('http'); 
var serverio = http.createServer(app); 
var io = require('socket.io').listen(serverio, { log: false }); 
var socket; 
var allcli = []; 
var red, blue, green, magenta, reset; 
red = '\033[31m'; 
blue = '\033[34m'; 
green = '\033[32m'; 
magenta = '\033[35m'; 
orange = '\033[43m'; 
reset = '\033[0m'; 

var queue = 'ha.atualizacao_mobile'; 
var urlRabbit = 'amqp://login:[email protected]?heartbeat=55' // Amazon 
var amqp = require('amqplib/callback_api'); 
var debug = true; 

console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); 
io.set('heartbeat interval', 10 * 60); 
console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); 

console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); 
io.set('heartbeat timeout', 11 * 60); 
console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); 


io.sockets.on('connection', function(socket){ 

    socket.on('error', function (exc) { 
     console.log(orange+"Ignoring exception: " + exc + reset); 
    }); 

    socket.on('send-indice', function (data) { 
     // Some business logic 
    }); 

    socket.on('disconnect', function() { 
     // Some business logic 
    }); 

}); 

function updatecli(data){ 
    // Some business logic 
} 

amqp.connect(urlRabbit, null, function(err, conn) { 
    if (err !== null) { 
     return console.log("Error creating connection: " + err); 
    } 

    conn.on('error', function(err) { 
     console.log("Generated event 'error': " + err); 
    }); 

    conn.on('close', function() { 
     console.log("Connection closed."); 
     process.exit(); 
    }); 

    processRabbitConnection(conn, function() { 
     conn.close(); 
    }); 
}); 

function processRabbitConnection(conn, finalize) { 
    conn.createChannel(function(err, channel) { 

     if (err != null) { 
      console.log("Error creating channel: " + err); 
      return finalize(); 
     } 

     channel.assertQueue(queue, null, function(err, ok) { 
      if (err !== null) { 
        console.log("Error asserting queue " + queue + ": " + err); 
        return finalize(); 
      } 

      channel.consume(queue, function (msg) { 
       if (msg !== null) { 
        try { 
         var dataObj = JSON.parse(msg.content); 
         if (debug == true) { 
          //console.log(dataObj); 
         } 
         updatecli(dataObj); 
        } catch(err) { 
         console.log("Error in JSON: " + err); 
        } 
        channel.ack(msg); 
       } 
      }, null, function(err, ok) { 
       if (err !== null) { 
        console.log("Error consuming message: " + err); 
        return finalize(); 
       } 
      }); 
     }); 
    }); 
} 

serverio.listen(9128, function() { 
    console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); 
}); 

答えて

7

明らかに問題は解決されています。近くの60秒の鼓動が問題でした。これはRabbitMQロードバランサと競合し、データが接続を通過したかどうかを1分ごとにチェックします(データがなくなると接続が切断されます)。 AMQP接続はメッセージの受信を停止し、ライブラリは明らかにそれに反応しません。この状況を回避するためには、より低い心拍(例えば、30秒)が必要である。

関連する問題