2017-06-28 16 views
0

Node.jsとWebSocketsを使用してシンプルでスケーラブルなチャットアプリケーションを構築しようとしています。私はNodeインスタンス間のメッセージブローカーとしてRabbitMQを使用しています。私はNginxの背後にあるNode clusteringにpm2を使用しています。Node.js cluster(pm2) - 閉じたWebSocketsはメッセージを受信します

私が抱えている問題は、既に閉じているWebSocketが開いているものではなく新しいメッセージを受信して​​いることです。ここで

私のWebSocketサーバーのコードです:

var url = require('url'); 
var WebSocket = require('ws'); 
var rabbit = require('amqplib').connect('amqp://localhost'); 
var express = require('express'); 
var http = require('http'); 

const app = express(); 
const server = http.createServer(app); 

var port = process.env.PORT || 3000; 

server.listen(port, function() { 
    console.log('Express server listening on port ' + server.address().port); 
}); 

const q = "messages"; 

const wss = new WebSocket.Server({ server }); 

wss.on('connection', function onConnection(ws) { 

    console.log('got websocket connection'); 

    rabbit.then(function(conn) { 
     ws.conn = conn; 
     console.log('consumer connected to rabbitMQ'); 
     return conn.createChannel(); 
    }).then(function(ch) { 
     console.log('consumer channel created'); 
     return ch.assertQueue(q).then(function(ok) { 
      console.log('consumer asserted queue'); 
      return ch.consume(q, function(msg) { 
       console.log('received message from rabbitMQ: ', msg.content.toString()); 
       if (ws.readyState === 1) { 
        if (msg && msg.content) 
         ws.send(msg.content.toString()); 
       } else { 
        console.warn('Websocket is not open, state is ', ws.readyState); 
       } 
      }, { noAck: true }); 
     }); 
    }).catch(function handleConsumerRabbitErr(err) { 
     console.warn('Handled rabbit consumer error:', err.stack); 
    }); 

    ws.on('message', function onMessage(msg) { 

     console.log('got websocket message'); 

     if (ws.conn) { 
      ws.conn.createChannel().then(function(ch) { 
       console.log('producer channel created'); 
       return ch.assertQueue(q).then(function(ok) { 
        console.log('sending ' + msg + ' to rabbitMQ'); 
        return ch.sendToQueue(q, Buffer(msg)); 
       }); 
      }).catch(function handleProducerRabbitErr(err) { 
       console.warn('Handled rabbit producer error:', err.stack); 
      }); 
     } 

    }); 

    ws.on('close', function onClose() { 
     console.log('websocket closing'); 
    }); 
}); 

私のnginxの設定ファイル:

http { 

    upstream nodejs { 
     server localhost:3000; 
    } 

    map $http_upgrade $connection_upgrade { 
     default upgrade; 
     '' close; 
    } 

    server { 
     listen 80; 
     listen 443 ssl; 
     server_name 192.168.45.45; 
     root /vagrant/static; 

     # Redirect HTTP to HTTPS 
     if ($scheme = http) { 
      return 301 https://$server_name$request_uri; 
     } 

     proxy_http_version 1.1; 
     proxy_set_header Host $host; 
     proxy_set_header Connection ""; 
     proxy_set_header X-Real-IP $remote_addr; 
     proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 

     ssl_certificate /etc/ssl/certs/nginx-selfsigned.crt; 
     ssl_certificate_key /etc/ssl/private/nginx-selfsigned.key; 
     ssl_session_cache shared:SSL:1m; 
     ssl_prefer_server_ciphers on; 

     location /ws/ { 
      proxy_set_header Upgrade $http_upgrade; 
      proxy_set_header Connection $connection_upgrade; 
      proxy_pass http://nodejs; 
     } 

     location /app/ { 
      proxy_pass http://nodejs; 
     } 
    } 
} 

私は(私のVM上の2つのノードを作成します)、次のコマンドを使用してアプリケーションを実行しています:ここで

$ pm2 app.js -i 0 --name chat

pm2 log後方の出力でありますえー、WebSocketのメッセージを送信するページを更新し、2つの以上のメッセージを送信する:

[STREAMING] Now streaming realtime logs for [all] processes 
0|chat  | got websocket connection    # Initial websocket connection on node 0 
0|chat  | consumer connected to rabbitMQ 
0|chat  | consumer channel created 
0|chat  | consumer asserted queue 
0|chat  | got websocket message 
0|chat  | producer channel created 
0|chat  | sending test to rabbitMQ 
0|chat  | received message from rabbitMQ: test # Websocket received message from RabbitMQ, as expected 
0|chat  | websocket closing 
1|chat  | got websocket connection    # Page refresh, new websocket connection on node 1 
1|chat  | consumer connected to rabbitMQ 
1|chat  | consumer channel created 
1|chat  | consumer asserted queue 
1|chat  | got websocket message 
1|chat  | producer channel created 
1|chat  | sending test to rabbitMQ 
0|chat  | received message from rabbitMQ: test # ERROR - old websocket received message from RabbitMQ! 
0|chat  | Websocket is not open, state is 3  # Note these two lines are from node 0 
1|chat  | got websocket message     # Sent another message without refreshing the page 
1|chat  | producer channel created 
1|chat  | sending test to rabbitMQ    # This one succeeds, and it alternates from here 
1|chat  | received message from rabbitMQ: test 

私はそれは、nは別名、私が開いたのWebSocketの数である(送信されるすべてのn番目のメッセージ、に成功していることがわかりました。 WebSocketが閉じられているかどうかにかかわらず、ページをリフレッシュした回数)。

WebSocketが閉じられた後、それはmessageイベントがもう発生しないはずです。明らかにそれは事実ではない。

WebSocketが閉じられた後にメッセージの受信を停止するにはどうすればよいですか?

答えて

0

もし私がRTFM for RabbitMQを持っていたら、キューからの消費はデフォルトではラウンドロビンであることを知りました。実際、私が探していたのはpub/sub with exchangesでした。

関連する問題