私は投票申請を収集し、それらをCassandraに格納するNode Appを持っています。投票は、base64でコード化された暗号化された文字列として格納されます。 APIには/export
というエンドポイントがあり、すべてのこれらの投票文字列(おそらく100万以上)を取得し、それらをバイナリに変換し、後でvote.egdファイルに追加する必要があります。そのファイルは圧縮されてクライアントに送信されます。私の考えは、各投票文字列をバイナリに変換してWriteStream
に書き込んで、Cassandraから行をストリームすることです。 この機能をPromiseで包み込み、使いやすいようにします。私は以下を持っています:Cassandraからバックプレッシャーを考慮したファイルへのストリームデータ
streamVotesToFile(query, validVotesFileBasename) {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);
writeStream.on('error', (err) => {
logger.error(`Writestream ${validVotesFileBasename}.egd error`);
reject(err);
});
writeStream.on('drain',() => {
logger.info(`Writestream ${validVotesFileBasename}.egd error`);
})
db.client.stream(query)
.on('readable', function() {
let row = this.read();
while (row) {
const envelope = new Buffer(row.vote, 'base64');
if(!writeStream.write(envelope + '\n')) {
logger.error(`Couldn't write vote`);
}
row = this.read()
}
})
.on('end',() => { // No more rows from Cassandra
writeStream.end();
writeStream.on('finish',() => {
logger.info(`Stream done writing`);
resolve();
});
})
.on('error', (err) => { // err is a response error from Cassandra
reject(err);
});
});
}
これを実行すると、すべての投票がファイルに追加され、正常にダウンロードされます。しかし、私が持っている問題/質問の束があります
それはアプリに他のすべての要求を実行している間、私は
/export
エンドポイントへのreqを作成し、この関数が実行されている場合は非常に遅いですまたはちょうどありませんエクスポート要求が完了する前に終了します。私は、イベントループがカッサンドラのストリーム(毎秒千人)からのこれらの出来事のすべてによって騒がれているので、推測していますか?すべての票がファイルに書かれているようですが、ほとんどすべて
writeStream.write()
コールではfalse
となり、対応するログメッセージ(コード参照)が表示されます。WritableStreamのバックプレッシャと 'drain'イベントを考慮する必要があることを理想的には私は
pipe()
を使用し、バックプレッシャサポート(右?)が組み込まれているため投票をパイプにパイプするのが理想です。各行を処理する(バイナリに変換し、将来他の行フィールドから他のデータを追加する可能性があります)、どのようにパイプで行うのでしょうか?
意味があります。しかし、私があなたのコードを使用して^ ^エラーが発生しました:TypeError:ValidChunk(_stream_writable.js:216:10)のTransform.Writable.write(_stream_writable.js:245:12)のResultStreamで無効な非文字列/バッファチャンクResultStream.Readable.read(_stream_readable.js:381:10)のResultStream.emit(events.js:188:7)のemitOne(events.js:96:13)の.ondata(_stream_readable.js:555:20) – gcosta
クライアントストリームはobjectModeのReadable Streams2オブジェクトを返すので、ああ、myTransformはobjectModeのTransformストリームである必要があります。 (objectModeをtrueに設定します)。ありがとう! – gcosta