2017-08-16 9 views
0

大きなCSVファイルを読み込んで解析しようとしていましたが、行ごとに非同期計算をいくつか行い、だから、私はPromise pを作り、多くの.then(xxx)を連鎖しようとし、csvの最後には.then(yyy)という最後のカウントを出力する。約束の中のいくつかのものが実行されていません

ただし、この数値は加算されません。しかし、私がp = p.then(xxx)p = p.then(yyy)を行うと、その数は(より小さいcsvファイルの場合)加算されますが、(大容​​量のcsvファイルの場合)時にはメモリリークに直面します。

私が間違っていたことはありますか?

var fs = require('fs') 
const csv = require('fast-csv'); 
var Promise = require('bluebird') 
var count = 0; 
var actual = 0; 
let p = Promise.resolve(); 
const stream = fs.createReadStream(`/Users/ssmlee/Desktop/KingKong_Sims_5M.txt`); 
const csvStream = csv({ 
    delimiter: ';' 
}) 
.on('data', (row) => { 
    count++ 
    if (count % 10000 === 0) { 
     console.log(count) 
     console.log(process.memoryUsage().heapUsed) 
    } 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    return Promise.resolve().delay(5) 
    .then(function() { 
     actual++ 
    }) 
    }); 
}) 
.on('end',() => { 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    console.log(actual); // 4999977 or something like this 
    console.log(count); // 5000000 
    }); 
}); 
stream.pipe(csvStream); 
+1

なぜ、動作するスキームがメモリをリークすると思いますか? 'p = p.then(...)'には何も問題ありません。その構造自体は、メモリリークを引き起こしません。 – jfriend00

+0

私は 'process.memoryUsage()。heapUsed'を使ってメモリ使用量をチェックしています。メモリはガベージコレクションされていません。おそらく5m行のランダムなファイルを生成して、これが起こっているのを知ることができます –

+0

これを実装する方法が選択されているため、大量のメモリを使用しています。あなたは、何十万という約束を同時に飛行させるのではなく、物事を適切に順序づけることによって、より少ない記憶を使うことができます。たとえば、最初の行を読み込み、CSVストリームを一時停止し、非同期操作を行い、CSVストリームの解放が完了すると、エンドツーエンドのタイミングを速めるためにいくつかの操作を並行して実行したいが、妥当なメモリー使用量が必要な場合は、50万ではなく、一度に(おそらく10個の)操作を並列に保つ必要があります。 – jfriend00

答えて

1

あなたが遅延してactual数を増やすませんが、決して(離れthenの結果を投げて)約束を待つ場合は、ストリームがすでに起きていないすべての増分で終わるかもしれません。あなたの例では、23コールバックはまだ5msの遅延を待っています。 Btw、これらのすべてを同じに連結するp = Promise.resolve()はあまり意味がありません。ただすぐにすべてを実行できます。

p = p.then(…)を実行している場合、非常に長い約束チェーンを構築します。これはメモリをリークさせるべきではありませんが、多くのメモリを使用します - 5msの遅延はすべて連続して連鎖し、スクリプトは実行するには少なくとも25000秒かかるでしょう。ファイルは最初に読み込まれ、何百万もの約束事が生成された後、次々と解決されます(ガベージコレクションされる可能性があります)。

このような連続的なアプローチを実行するには、おそらくストリームの背圧システムを使用する必要があります。

しかし、あなたは一度にあまりにも多くの生きているの約束を持つ、並列の遅延をも待つことができます。

p = Promise.all([p, Promise.delay(5)]).then(() => { 
    actual++; 
}); 
+0

私の理解は、あなたが非常に長い約束を持っている場合と、5mの約束がすべて解決された後に約束が解決される場合の両方についてです。最初のケースでは、最後の連鎖した約束として 'on( 'end')'の中に '.then()'があります。それらの2つは同じものではありませんか? –

+1

@ Shih-MinLeeいいえ、 'p'を変更していなければ、すべてのコールバックは全く同じ約束事(' Promise.resolve() ')に結びついています。それは長鎖ではなく、木です。そして、すべてのコールバックはすぐに実行されます( 'Promise.resolve()'が解決されるとすぐに)*お互いを待っていない*。 – Bergi

+0

Gotcha。ありがとう。 –

1

さてあなたは約束を並列に実行するので、それらをチェーンすることはできません。

allp = []; 
.... 
.on('data', (row) => { 
    ... 
    allp.push(p.then(() => {...})); 
} 
... 
.on('end',() => { 
Promise.all(allp).then(() => {}) 

もちろん、イベントごとに1つのプロミスを作成しています。

最後までに約束を解除する必要がある場合は、これを自分で行う必要があります。

ので、あなたが行うことができますが、唯一その副作用(増加数)で、約束の戻り値に興味があるように思えません

.on('data', (row) => { 
    ... 
    if (allp.length > 50) allp = [Promise.all(allp).then(()=>null)]; 
    allp.push(p.then(() => {...})); 
} 

50個の約束がグループ化されますこうすることで、彼らは解決した後と、彼らは

.then(()=>null)はPromise.allから結果の配列があまりにも破棄されることを保証(...次の50になります)、単一の約束によって置き換えられます。 (代わりに1つのnullの約束はallpになります)

これはPromise.allの実装に依存します。 Promise.allがそれぞれの約束を解決したときに(そしてその結果が利用可能な場合)、これは完璧です。

Promise.allが50の約束をすべて待ってからすべて解放すると、50の各グループに1つの非常に長い実行予定がない限り、これはまだ機能します。


延期約束の反パターンを使用することができます。

開始時に1つの延期約束を作成します。値1データ

.on('data', (row) => { 
    ... 
    asyncRunningCount++; 
    p.then(() => {work})) 
    .then(() { 
     asyncRunningCount--; 
     if (asyncRunningCount == 0) resolve(); // no more tasks running 
    }); 
} 

.on('end',() => { 
    asyncRunningCount--; 
    // remove the 1 that was set on start. No more new tasks will be added 
    if (asyncRunningCount == 0) resolve(); // no more tasks running 
    p2.then(() => { all done }) 

上で

var resolve; 
var asyncRunningCount = 1; // start with 1 
var p2 = new Promise(function() { 
    resolve = arguments[0]; 
}); 

起動時にタスクを実行するのカウントを一時的に(端部)には0

に低下した場合、分解されることから、P2を防ぎ1が減分される。すべてのタスクが完了すると、asyncRunningCountは0になります。これは、オン(終了)のデクリメントまたはオン(データ)のデクリメントによって発生します。

p2.thenは、すべてのタスクが終了すると実行されます。

他の約束事はすべて終了します。 実際には(データ)をオンにすると約束は必要ありません。ちょうどあなたの非同期タスクを開始し、非同期タスクが減少asyncRunningCountを行って、そして0


をチェックされたときに、データが非常に高速で来る場合、これはまだ約束の多くは並行して開始することを、意味しています。 しかし、あなたが約束を開始しない場合は、着信データを格納する必要があるので、メモリはいずれかの方法で使用されます。

+0

はい、どちらのソリューションも機能しますが、なぜそんなに複雑になりますか?最初のアプローチでは、毎回「Promise.all」を実行するのではなく、配列に50の約束が溜まるのを待つのはなぜですか? 2番目のアプローチでは、遅延した反パターンを使用する理由はありません。イベントエミッタ全体を 'new Promise'コールバックの中に移動するだけです。 (もちろん、それでもまだ 'Promise.all'反パターン:D) – Bergi

+0

*すべての*イベントのためにPromise.allを実行すると(1回目の多分を除いて、1つしかないかもしれません)、次に2回作成しますPromise.allをまったく使用しなかった場合と同じように、多くの約束事(1回に2回未満)を約束します。はい、彼らはすべて完了したらすぐに解放されるので、記憶は問題ではありません。しかし、実行時間が問題になる可能性があります... – Martin

+0

線形の時間とメモリの複雑さを持っている限り、私は通常気にしません:-)実行時間は非同期作業(OPの例では 'delay(5)')によって縛られます。 – Bergi

関連する問題