Node.jsとWebSocketsを使用してシンプルでスケーラブルなチャットアプリケーションを構築しようとしています。私はNodeインスタンス間のメッセージブローカーとしてRabbitMQを使用しています。私はNginxの背後にあるNode clusteringにpm2を使用しています。Node.js cluster(pm2) - 閉じたWebSocketsはメッセージを受信します
私が抱えている問題は、既に閉じているWebSocketが開いているものではなく新しいメッセージを受信していることです。ここで
私のWebSocketサーバーのコードです:
var url = require('url');
var WebSocket = require('ws');
var rabbit = require('amqplib').connect('amqp://localhost');
var express = require('express');
var http = require('http');
const app = express();
const server = http.createServer(app);
var port = process.env.PORT || 3000;
server.listen(port, function() {
console.log('Express server listening on port ' + server.address().port);
});
const q = "messages";
const wss = new WebSocket.Server({ server });
wss.on('connection', function onConnection(ws) {
console.log('got websocket connection');
rabbit.then(function(conn) {
ws.conn = conn;
console.log('consumer connected to rabbitMQ');
return conn.createChannel();
}).then(function(ch) {
console.log('consumer channel created');
return ch.assertQueue(q).then(function(ok) {
console.log('consumer asserted queue');
return ch.consume(q, function(msg) {
console.log('received message from rabbitMQ: ', msg.content.toString());
if (ws.readyState === 1) {
if (msg && msg.content)
ws.send(msg.content.toString());
} else {
console.warn('Websocket is not open, state is ', ws.readyState);
}
}, { noAck: true });
});
}).catch(function handleConsumerRabbitErr(err) {
console.warn('Handled rabbit consumer error:', err.stack);
});
ws.on('message', function onMessage(msg) {
console.log('got websocket message');
if (ws.conn) {
ws.conn.createChannel().then(function(ch) {
console.log('producer channel created');
return ch.assertQueue(q).then(function(ok) {
console.log('sending ' + msg + ' to rabbitMQ');
return ch.sendToQueue(q, Buffer(msg));
});
}).catch(function handleProducerRabbitErr(err) {
console.warn('Handled rabbit producer error:', err.stack);
});
}
});
ws.on('close', function onClose() {
console.log('websocket closing');
});
});
私のnginxの設定ファイル:
http {
upstream nodejs {
server localhost:3000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
listen 443 ssl;
server_name 192.168.45.45;
root /vagrant/static;
# Redirect HTTP to HTTPS
if ($scheme = http) {
return 301 https://$server_name$request_uri;
}
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header Connection "";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
ssl_certificate /etc/ssl/certs/nginx-selfsigned.crt;
ssl_certificate_key /etc/ssl/private/nginx-selfsigned.key;
ssl_session_cache shared:SSL:1m;
ssl_prefer_server_ciphers on;
location /ws/ {
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_pass http://nodejs;
}
location /app/ {
proxy_pass http://nodejs;
}
}
}
私は(私のVM上の2つのノードを作成します)、次のコマンドを使用してアプリケーションを実行しています:ここで
$ pm2 app.js -i 0 --name chat
pm2 log
後方の出力でありますえー、WebSocketのメッセージを送信するページを更新し、2つの以上のメッセージを送信する:
[STREAMING] Now streaming realtime logs for [all] processes
0|chat | got websocket connection # Initial websocket connection on node 0
0|chat | consumer connected to rabbitMQ
0|chat | consumer channel created
0|chat | consumer asserted queue
0|chat | got websocket message
0|chat | producer channel created
0|chat | sending test to rabbitMQ
0|chat | received message from rabbitMQ: test # Websocket received message from RabbitMQ, as expected
0|chat | websocket closing
1|chat | got websocket connection # Page refresh, new websocket connection on node 1
1|chat | consumer connected to rabbitMQ
1|chat | consumer channel created
1|chat | consumer asserted queue
1|chat | got websocket message
1|chat | producer channel created
1|chat | sending test to rabbitMQ
0|chat | received message from rabbitMQ: test # ERROR - old websocket received message from RabbitMQ!
0|chat | Websocket is not open, state is 3 # Note these two lines are from node 0
1|chat | got websocket message # Sent another message without refreshing the page
1|chat | producer channel created
1|chat | sending test to rabbitMQ # This one succeeds, and it alternates from here
1|chat | received message from rabbitMQ: test
私はそれは、nは別名、私が開いたのWebSocketの数である(送信されるすべてのn番目のメッセージ、に成功していることがわかりました。 WebSocketが閉じられているかどうかにかかわらず、ページをリフレッシュした回数)。
WebSocketが閉じられた後、それはmessage
イベントがもう発生しないはずです。明らかにそれは事実ではない。
WebSocketが閉じられた後にメッセージの受信を停止するにはどうすればよいですか?