0
NodeJSでは、大きなファイルを読み込み、フラグメント(n行)で処理し、データを消費してから次のn行を処理したいと考えています。ノードストリーム:n行を変換して変換して続行する
私はいくつかのモジュール(fs、es-stream、main-onesのためのノード-ele)を使ってみたいと思っています。
新しい行を処理する前に私がやったベストは(コードは以下です)、変換の完了を待たずにいました。
ここに私のスニペット:
const fs = require('fs');
const es = require('event-stream');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const etl = require('etl');
exports.toDatabase = (file, done) => {
// File contains six lines wit htwo values (example : aa:bb for the first line)
let input = fs.createReadStream(todayTeamsFile);
input
.pipe(es.split())
.pipe(etl.collect(2))
.pipe(es.map((data, nextMap) => {
// I'd like to process all this code before continuing to read my stream
let date = Date.now();
console.log('map data ' + date);
console.log(data);
parse(data[0], {
delimiter: ';'
}, (err, output) => {
console.log('Row done ' + date);
// Treatment to do would be to insert in database the output
console.log(output);
console.log('------ ' + date);
return nextMap();
});
}));
};
しかし、次のマップが最初の呼び出し
TESTING !! map data 1481824486765 [ 'aa;zz', 'bb;cc' ] map data 1481824486771 [ 'dd;ee', 'ff;gg' ] Row done 1481824486765 [ [ 'aa', 'zz' ] ] ------ 1481824486765 Row done 1481824486771 [ [ 'dd', 'ee' ] ] ------ 1481824486771 map data 1481824486785 [ 'hh;ii', '' ] Row done 1481824486785 [ [ 'hh', 'ii' ] ] ------ 1481824486785
こんにちは、ありがとうございました! ここでは一時停止が行われますが、コードの詳細を教えてもらえますか? しかし、このスニペットは行単位で行い、一度にn行は送信しません。 チャンクストリームを再チャンクしてどういう意味ですか? ここで考えているのは、データベースの一括挿入を検証して実行するために、いくつかの行(20000を超える例100)を抽出することです。挿入に失敗した場合は読み込みを中止してください。 – Steph0
私はあなたのために発見しようとすることができますいくつかの質問をください。あなたの問題について説明したところでは、 'buff.forEach(this.push.bind(this));の代わりに' this.push(buff); 'を実行し、次のストリームハンドラでは'受け取った配列のn個のアイテム(SQLをビルドし、サーバーに送信し、エラーがあれば=> this.emit(error '、err);)、SQLプロセスを実行中にストリームの同期を維持するために、 sql.cb内のstream.cb。 –
文字列の代わりにオブジェクトを渡すには、through.objを使用する必要があることを忘れていました。 –