2015-10-28 18 views
14

私は書き込み可能なストリームであるモジュールを書いています。私は自分のユーザーのためのパイプインターフェイスを実装したい。パイプで読み込み可能なストリームをnodejs内の書き込み可能なストリームから一時停止する正しい方法は何ですか?

エラーが発生した場合は、読み込み可能なストリームを一時停止してエラーイベントを発生させる必要があります。次に、ユーザーはエラーが発生しても問題がなければ、データ処理を再開することができます。

var writeable = new BackPressureStream(); 
writeable.on('error', function(error){ 
    console.log(error); 
    writeable.resume(); 
}); 

var readable = require('fs').createReadStream('somefile.txt'); 
readable.pipe.(writeable); 

私は、ノードが読み込み可能なストリームを一時停止するために使用することができreadable.pause()方法、を提供してくれていることがわかります。しかし、私は私の書き込み可能なストリームモジュールから呼び出すことができますどのように取得することはできません。

var Writable = require('stream').Writable; 

function BackPressureStream(options) { 
    Writable.call(this, options); 
} 
require('util').inherits(BackPressureStream, Writable); 

BackPressureStream.prototype._write = function(chunk, encoding, done) { 
    done(); 
}; 

BackPressureStream.prototype.resume = function() { 
    this.emit('drain'); 
} 

どのように背圧が書き込み可能なストリームで実装することができますか?

P.S.可読ストリームをパラメータとして提供するイベントはpipe/unpipeです。しかし、パイプストリームの場合、一時停止する唯一のチャンスは、読み取り可能なストリームを書き込み可能からアンペアすることです。

私はそれを正しくしましたか?ユーザーが再開するまで書き込み可能なストリームをアンペアする必要がありますか?そして、ユーザーの呼び出しを再開した後、私はパイプで読み込み可能なストリームを返す必要がありますか?

+1

をバックプレッシャー処理上の別の優れたリソースですか? –

+1

ねえ、あなたの質問に対する答えは見つかりましたか? –

答えて

0

基本的に、私が理解しているように、エラーイベントが発生した場合には、ストリームにバックプレッシャーをかけようとしています。あなたにはいくつかの選択肢があります。

最初に、既に特定したように、pipeを使用して、読み込みストリームのインスタンスを取得し、派手なフットワークを行います。

別のオプションは、(すなわち、それは、入力としてWritableStreamを取り、ストリーム関数を実装する場合、供給されたストリームに沿ってデータを渡し、この機能を提供してラップ書き込み可能なストリームを作成することです。

基本的にあなたが終わります書き込み可能なストリームを実装して

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream扱っています。

キーfoのようなものでrの場合、基本となる書き込み可能ファイルにエラーが発生した場合は、ストリームにフラグを設定し、次にwriteを呼び出すと、チャンクをバッファリングし、コールバックを格納して呼び出します。

// ... 
constructor(wrappedWritableStream) { 
    wrappedWritableStream.on('error', this.errorHandler); 
    this.wrappedWritableStream = wrappedWritableStream; 
} 
// ... 
write(chunk, encoding, callback) { 
    if (this.hadError) { 
     // Note: until callback is called, this function won't be called again, so we will have maximum one stored 
     // chunk. 
     this.bufferedChunk = [chunk, encoding, callback]; 
    } else { 
     wrappedWritableStream.write(chunk, encoding, callback); 
    } 
} 
// ... 
errorHandler(err) { 
    console.error(err); 
    this.hadError = err; 
    this.emit(err); 
} 
// ... 
recoverFromError() { 
    if (this.bufferedChunk) { 
     wrappedWritableStream.write(...this.bufferedChunk); 
     this.bufferedChunk = undefined; 
    } 
    this.hadError = false; 
} 

注ような何か:あなたが唯一のwrite機能を実装する必要があるはずですが、私の周りを掘ると、他の実装機能とプレイすることをお勧めします。

エラーイベントを発生したストリームに書き込むのに問題があるかもしれませんが、解決するための別の問題として残しておきます。

ここ https://nodejs.org/en/docs/guides/backpressuring-in-streams/この1のための恵みを開始することに興味

関連する問題