私は、フォークされたプロセスからの無限のデータストリームを持っています。私はこのストリームをモジュールで処理したいときがあります。このストリームからデータを複製して、別のモジュールで処理することが必要な場合もあります(たとえば、データストリームを監視していますが、何か面白いことが起こったら、さらなる調査)。NodeJSストリーム分割
それでは、次のシナリオを想定してみましょう。
- 私はプログラムを起動し、読み込み可能ストリームを消費し始める
- 2秒後、私は別のストリームリーダー によって1秒間に同じデータを処理したいです
- 時間が過ぎると、私は第2の消費者を閉じたいが、元の消費者は元のままでなければならない。ここで
、このためのコードスニペットです:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
ここに私の問題は、STREAM2をunpipingすると、それが閉じて取得していないということです。私はunpipe
コマンドの後stream.end()
を呼び出した場合、それはエラーでクラッシュ:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
私も第二の流れからフラッシュされるが、それはどちらか動作しませんでしたバッファを支援するために、ソース・ストリームを一時停止しようとしました。
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function() {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
ここまでと同じエラー(後回し書き込み)。
どのように問題を解決するには?私の本来の目的は、ストリームデータを1つのポイントから複製し、しばらくしてから2番目のストリームを閉じることです。
Note: I tried to use this answer to make it work.