2017-01-21 22 views
1

私はcsv-to-json、CSVファイルを処理するためのきちんとしたライブラリを使用しています。Node.jsで読み取り可能なストリームを一時停止する

大規模な(200万行以上)CSVを処理してDBに挿入する必要があるユースケースがあります。

これを行うにはメモリの問題に遭遇せずに、CSVをストリームとして処理し、10000行ごとにストリームを一時停止し、DBに行を挿入してからストリームを再開します。

何らかの理由で私はpauseストリームに見えません。

例えば、次のコードを取る:

const rs = fs.createReadStream("./foo.csv"); 
rs.pause(); 

let count = 0; 

csv() 
.fromStream(rs) 
.on("json", (json) => { 
    count++; 
    console.log(count); 
}) 
.on("done",() => { 
    cb(null, count); 
}) 
.on("error", (err) => { 
    cb(err); 
}) 

countは(それは私が私のCSVを持っているどのように多くの行です)200回記録されます - ストリームが通過する前に一時停止されているので、私は何かをログに記録しないことを期待していましたそれ以上にfromStream()

+0

だあなたは、データベース内の時間の挿入に1行をしているの?キューを作成し、同時に実行するリクエストを制限したり、メモリリークを防ぎ、リクエストをフラッシュしないように非同期メソッドを使用するのはなぜですか? –

+1

@AsifSeed私は、ストリームを一時停止する以外の何かに興味がありません。それが実行可能かどうかについての情報です。どうもありがとうございます。 –

答えて

1

は、ここでは、このIssueで追跡ライブラリの作成者によって提案された解決策、です:

var tmpArr=[]; 
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({ 
    write: function(json, encoding,callback){ 
    tmpArr.push(json); 
    if (tmpArr.length===10000){ 
     myDb.save(tmpArr,function(){ 
     tmpArr=[]; 
     callback(); 
     }) 
    }else{ 
     callback(); 
    } 
    } , 
    objectMode:true 
})) 
.on('finish',function(){ 
    if (tmpArr.length>0){ 
    myDb.save(tmpArr,function(){ 
     tmpArr=[]; 
    }) 
    } 
}) 

私は実際にエミュレートするために管理してきましたそのようにポピュラーで一時停止しますが、理想的ではありません:

let count = 0; 
var csvParser=csv() 
.fromStream(rs) 
.on("json", (json) => { 
    rows.push(json); 
    if (rows.length % 1000 === 0) { 
    rs.unpipe(); 
    // clear `rows` right after `unpipe` 
    const entries = rows; 
    rows = []; 
    this._insertEntries(db, entries,()=> { 
     rs.pipe(csvParser); 
    }); 
    } 
}) 
+0

書き込み可能なストリームを使用すると、一時停止して、 。共有ありがとう! – Johnny

1

csv2jsonライブラリを変更しない限り行うことはできません。

この

はあなたがrs.pauseをしたときに最初に
https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states

ストリームが一時停止モードになって読むべきリンクです()。たとえそれをしないとしても、読み込み可能なストリームは一時停止モードで開始されます。

ストリームは、3つのシナリオでresumeに入ります。

  • どちらか.on('data')イベントリスナや
  • .pipe()接続方法や
  • readable.resume()が明示的に呼び出されているがあります。あなたのケースでは

fromStream()方法は、このように、ストリームを再開し、あなたの読めるストリームに添付pipe方法があります。

リファレンスコード:
https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378

Converter.prototype.fromStream=function(readStream,cb){ 
    if (cb && typeof cb ==="function"){ 
    this.wrapCallback(cb); 
    } 
    process.nextTick(function(){ 
    readStream.pipe(this); 
    }.bind(this)) 
    return this; 
} 
0

私はcsvtojsonもfromString(...)メソッドを持っているという事実を利用し、以下のように使用しました。

  1. line-by-lineパッケージを使用して、固定数の行、つまり10000を読み取り、配列に格納します。
  2. lr.pause()を使用する行単位の一時停止
  3. インデックス0
  4. で挿入ヘッダー行は、(あなたのcsvファイルは、ヘッダ行を持っているならば、ライン・バイ・ラインリーダーによって返された最初の行を無視するように簡単な条件文を使用してください)あなたを与えるだろうEOL文字ですべての行に参加そのCSVファイルの10000行の文字列表現。
  5. csvtojsonの.fromString(...)を使用して、ブロックの文字列表現をjsonオブジェクトに変換し、dbに挿入します。
  6. lr.resume()でストリームを再開し、行単位のリーダーで'end'イベントが発生するまで繰り返します。

はここで完全なコード

const CSVToJSON = require("csvtojson"); 
const LineByLineReader = require("line-by-line"); 
const { EOL } = require("os"); 

const BLOCK_LIMIT = 10000; 

let lines = []; 
let isFirstLineProcessed = false; 

const lr = new LineByLineReader("./foo.csv"); 

lr 
.on("line", (line) => { 

    // remove this if statement if your CSV does not contain headers line 
    if (!isFirstLineProcessed) { 
     isFirstLineProcessed = true; 
     return; 
    } 

    lines.push(line); 

    if (lines.length === BLOCK_LIMIT) { 
     lr.pause(); 

     // insert headers string ("field1, field2, ...") at index 0; 
     lines.splice(0, 0, headers); 

     // join all lines using newline operator ("\n") to form a valid csv string 
     const csvBlockString = lines.join(EOL); 
     const entries = []; 

     lines = [];  

     csv() 
      .fromString(csvBlockString) 
      .on("json", (json) => { 
       entries.push(json); 
      }) 
      .on("done",() => { 
       this._insertEntries(db, entries,()=> { 
        lr.resume(); 
       }); 
      }); 
    } 
}) 
.on("end",() => { 
    console.log("done"); 
}); 
関連する問題