2017-02-20 8 views
0

私はノードストリームに精通していますが、コードを抽象化するためのベストプラクティスに苦労しています。実際の作業はtransform()で起こるノード - パイプラインを関数に抽象化する

inputStream 
.pipe(csv.parse({columns:true}) 
.pipe(csv.transform(function(row) {return transform(row); })) 
.pipe(csv.stringify({header: true}) 
.pipe(outputStream); 

は、ここで私が今日書いているもののストリップダウンバージョンです。実際に変更されるのはinputStream,transform()outputStreamです。私が言ったように、これは私が実際に使用しているものの剥奪されたバージョンです。私は、多くのエラー処理と各パイプステップのログを取っています。これは最終的にコードを抽象化しようとしています。私が書くために探しています何

はそうのような単一のパイプ段階、である:私は理解するのに苦労しています何

inputStream 
.pipe(csvFunction(transform(row))) 
.pipe(outputStream); 

ストリームを受け入れ、単一の関数にそれらのパイプのステップを有効にする方法で、ストリームを返します。私はthrough2のような図書館を見てきましたが、私はどこに行くのか私にはどうなっているのか分かりません。

答えて

0

ここで私は一緒に行くことになったものです。私はthrough2ライブラリとstreaming API of the csv libraryを使って、私が探していたパイプ関数を作成しました。

var csv = require('csv'); 
    through = require('through2'); 

module.exports = function(transformFunc) { 
    parser = csv.parse({columns:true, relax_column_count:true}), 
    transformer = csv.transform(function(row) { 
     return transformFunc(row); 
    }), 
    stringifier = csv.stringify({header: true}); 

    return through(function(chunk,enc,cb){ 
     var stream = this; 

      parser.on('data', function(data){ 
       transformer.write(data); 
      }); 

      transformer.on('data', function(data){ 
       stringifier.write(data); 
      }); 

      stringifier.on('data', function(data){ 
       stream.push(data); 
      }); 

      parser.write(chunk); 

      parser.removeAllListeners('data'); 
      transformer.removeAllListeners('data'); 
      stringifier.removeAllListeners('data'); 
      cb(); 
    }) 
} 

これは、私があまりにも多くのイベントリスナーを作成していたメモリエラーに実行しているためであった、私は終わりに向かってイベントリスナーを削除する部分を注目に値します。私は最初にonceでイベントを聞いてこの問題を解決しようとしましたが、その後のチャンクが読み込まれずに次のパイプステップに渡されるのを防ぎました。

フィードバックやその他のアイデアがある場合は教えてください。

2

あなたはこのようPassThroughクラスを使用することができます。

var PassThrough = require('stream').PassThrough; 

var csvStream = new PassThrough(); 
csvStream.on('pipe', function (source) { 
    // undo piping of source 
    source.unpipe(this); 
    // build own pipe-line and store internally 
    this.combinedStream = 
    source.pipe(csv.parse({columns: true})) 
     .pipe(csv.transform(function (row) { 
     return transform(row); 
     })) 
     .pipe(csv.stringify({header: true})); 
}); 

csvStream.pipe = function (dest, options) { 
    // pipe internal combined stream to dest 
    return this.combinedStream.pipe(dest, options); 
}; 

inputStream 
    .pipe(csvStream) 
    .pipe(outputStream); 
+0

変換(行)参照をストリームにどのように渡すのですか?トランスフォームストリームを使用する方が良いのではないでしょうか? PassThroughはストリームを実際に処理しているよりも監視しています。 – AdamPat

+0

ストリーム処理をどのようにパッケージ化するかによって異なります。表示されている場所に関数を置くか、参照として渡してください。もちろん、トランスフォームストリームでソリューションを実装することもできます(https://nodejs.org/api/stream.html#stream_implementing_a_transform_streamをご覧ください)。 PassThroughストリームでアイデアを見せたのは、私が見つけた単なる一番の方法でした。 – Marc