2017-07-09 5 views
-1

Websocketを使った角型アプリケーションとtcpソケット経由のスナップサーバーとの間のプロキシとして機能するnodejsバックエンドがあります。Nodejs net複数のデータイベント

私はrxjsを使用してnodejs netソケットをSubjectとしてラップしています。 何らかの理由で、私のtcpソケットの 'data'イベントが各メッセージの送信にもう一度呼び出されます。私はそれがデータのストリームだ、TCPの動作どれだけある(node:21209) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit

const log = require('./logger').init('SNAPCAST'); 
const utils = require('./utils'); 
const Rx = require('rxjs/Rx'); 
const net = require('net'); 

const createSubject =() => { 

    return Rx.Observable.create((observer) => { 

    const socketConnection = net.connect({port: 1705},() => { 

     const socketObservable = Rx.Observable.create((observer) => { 
     socketConnection.on('data', (data) => { 

      if(data) { 
      String(data.toString().trim()).split('\n').forEach((line) => { 

       if(line) { 
       observer.next(JSON.parse(line)); 
       } 

      }); 
      } 

     }); 
     socketConnection.on('error', (err) => observer.error(err)); 
     socketConnection.on('close',() => observer.complete()); 
     }); 

     const socketObserver = { 
     next: (data) => { 
      if (!socketConnection.destroyed) { 
      socketConnection.write(`${JSON.stringify(data)}\r\n`); 
      } 
     } 
     }; 

     const socket = Rx.Subject.create(socketObserver, socketObservable); 

     observer.next(socket); 
     observer.complete(); 

    }); 

    socketConnection.on('error', (err) => { 
     observer.error(err); 
    }); 

    }); 

}; 

module.exports =() => { 

    return Rx.Observable.create((observer) => { 

    createSubject().subscribe((socket) => { 

     const sendRequest = (message) => { 

     message.id = message.id || utils.UUID(); 
     message.jsonrpc = message.jsonrpc || '2.0'; 

     socket.next(message); 

     return socket.filter((response) => { 
      return response.id === message.id; 
     }).first(); 

     }; 

     observer.next({ 
     proxyRequest: (request) => { 
      return sendRequest(request); 
     } 
     }); 

     observer.complete(); 

    }, (err) => { 
     observer.error(err); 
    }); 

    }); 


}; 
+0

あなたの質問のコードサンプルを簡略化して、より良い回答を得るようにしてください。 –

答えて

-1

を取得し、いくつかの要求の後

。したがって、アプリケーションレベルのメッセージは1つまたは複数のイベントに到着することがあります。'data'どちらの状況にも対処する準備ができている必要があります。

+0

1つのフレームが適合しないので、tcpチャンクはマルチプル部分に到着することがあります。それは問題ではありません。私のデータイベントは、データ全体で複数回発生します。 jsonは1つのフレームに収まります。 – Pascal

0

リスナーを削除しないと、イベントエミッタがリークする可能性があります。これはあなたのコードの例のようです。私は次のようにコードを変更を示唆している:

Observable.create(obs => { 
    function handleSocketData = data => {}; 

    socketConnection.addEventListener('data', handleSocketData); 

    //unsubscribe function 
    return() => { 
    socketConnection.removeEventListener('data', handleSocketData);  
    } 
}); 

これはあなたのイベントリスナをするときに観察完了/エラー/サブスクライブ解除削除されることになります。

上記のコードは、追加/削除イベントリスナロジックを抽象化するObservable.fromEvent(socketConnection, 'data')を使用すると、さらに簡単になります。

+0

ありがとう、私はそれを試みます。私の推測では、 'sendRequest'関数は、.filter()の問題を引き起こし、.first()を追加すると、最初に一致した後に自動的に退会するというものでした。 – Pascal

関連する問題