2016-12-15 5 views
0

NodeJSでは、大きなファイルを読み込み、フラグメント(n行)で処理し、データを消費してから次のn行を処理したいと考えています。ノードストリーム:n行を変換して変換して続行する

私はいくつかのモジュール(fs、es-stream、main-onesのためのノード-ele)を使ってみたいと思っています。

新しい行を処理する前に私がやったベストは(コードは以下です)、変換の完了を待たずにいました。

ここに私のスニペット:

const fs = require('fs'); 
const es = require('event-stream'); 
const parse = require('csv-parse'); 
const stringify = require('csv-stringify'); 
const etl = require('etl'); 

exports.toDatabase = (file, done) => { 

    // File contains six lines wit htwo values (example : aa:bb for the first line) 
    let input = fs.createReadStream(todayTeamsFile); 

    input 
    .pipe(es.split()) 
    .pipe(etl.collect(2)) 
    .pipe(es.map((data, nextMap) => { 
     // I'd like to process all this code before continuing to read my stream 
     let date = Date.now(); 
     console.log('map data ' + date); 
     console.log(data); 

     parse(data[0], { 
     delimiter: ';' 
     }, (err, output) => { 
     console.log('Row done ' + date); 
     // Treatment to do would be to insert in database the output 
     console.log(output); 
     console.log('------ ' + date); 
     return nextMap(); 
     }); 

    })); 

}; 

しかし、次のマップが最初の呼び出し

 
TESTING !! 
map data 1481824486765 
[ 'aa;zz', 'bb;cc' ] 
map data 1481824486771 
[ 'dd;ee', 'ff;gg' ] 
Row done 1481824486765 
[ [ 'aa', 'zz' ] ] 
------ 1481824486765 
Row done 1481824486771 
[ [ 'dd', 'ee' ] ] 
------ 1481824486771 
map data 1481824486785 
[ 'hh;ii', '' ] 
Row done 1481824486785 
[ [ 'hh', 'ii' ] ] 
------ 1481824486785 

答えて

0

なぜわからないの完了前に起動されたことを出力ショーチャンクされたストリームをチャンクしたいのですが、ここでは、

var through = require('through2'); 
var split = require('split') 
var fs = require('fs') 
var handler = (function(len) { 
    var buff = []; 
    var p = 0; 
    return through.obj(function(chunk, enc, cb) { 
    buff.push(chunk); 
    p++; 
    if(buff.length===len) { 
     console.log('send--------------'); 
     this.push(buff); 
     buff = []; 
    } 
    if (p>25) { 
     this.emit('error', 'kill the stream') 
    } 
    cb(); 
    }, function (cb) { 
    this.push(buff); // may be much larger than 4, it may need a logic to re split. 
    buff = []; 
    cb(); 
    }); 
})(4); 

var origin = fs.createReadStream('tomate.csv'); 

origin 
.pipe(split()) 
.pipe(handler) 
.pipe(through.obj(function(chunk, enc, cb){ 
    console.log('process: %v', chunk); 
    cb() 
})); 

handler.on('error', function() { 
    origin.close(); 
    // still need o unpipe everything, use mississipi 
}); 
+0

こんにちは、ありがとうございました! ここでは一時停止が行われますが、コードの詳細を教えてもらえますか? しかし、このスニペットは行単位で行い、一度にn行は送信しません。 チャンクストリームを再チャンクしてどういう意味ですか? ここで考えているのは、データベースの一括挿入を検証して実行するために、いくつかの行(20000を超える例100)を抽出することです。挿入に失敗した場合は読み込みを中止してください。 – Steph0

+0

私はあなたのために発見しようとすることができますいくつかの質問をください。あなたの問題について説明したところでは、 'buff.forEach(this.push.bind(this));の代わりに' this.push(buff); 'を実行し、次のストリームハンドラでは'受け取った配列のn個のアイテム(SQLをビルドし、サーバーに送信し、エラーがあれば=> this.emit(error '、err);)、SQLプロセスを実行中にストリームの同期を維持するために、 sql.cb内のstream.cb。 –

+0

文字列の代わりにオブジェクトを渡すには、through.objを使用する必要があることを忘れていました。 –

関連する問題