2016-04-09 2 views
0

私はelasticsearchにインデックスを作成しようとしている巨大なjsonファイルを継承しました(実際はデータベースではありませんが、ほとんどのdb摂取)。私は摂取を行うためにノードを使用しています。私はストリームと非同期を試みましたが、私はこの問題に近づくためのフレームがありません。メモリのオーバーフローなどはありません。node.jsを使用して非常に大きなjsonファイルをデータベースに取り込みます

私は1に1を投稿することはできませんが、それは効果的のようなものに見える多次元オブジェクトだ:私はちょうどドキュメントを摂取する必要が

[ 
    { 
    document: { 
     type: 1, 
     type2: 2, 
     type3: {...} 
    }, 
    {...} 
] 

を、私はelasticsearchクライアントを使用して一括でそれらを処理することができます。ストリームを遅くし、解析し、チャンクする必要があります。

完全にスタック...ヘルプstackoverflow金曜日です。私は家に帰りたいです。 )。

+1

https://www.npmjs.com/package/json-parse-streamを試しましたか? – migg

+0

これはjsonストリーム解析の1つのバージョンかもしれませんが、私はこれを試してみます。 – unsalted

+0

okこれでいくつかの進歩を遂げるが、このモジュールが解決策になるかもしれない。 – unsalted

答えて

1

私が試した3番目のjsonストリームライブラリであるjson-parse-streamのmiggの提案に基づいて、私はついに働くインジェストを持っています。私がこれを書いている間、それは実際に問題なく走っています。誰かがこれを役に立つと思うでしょう。

const fs = require('graceful-fs'); 
const parse = require('json-parse-stream'); 
const es = require('event-stream'); 
const client = new require('elasticsearch').Client(); 
var WritableBulk = require('elasticsearch-streams').WritableBulk; 
var TransformToBulk = require('elasticsearch-streams').TransformToBulk; 


var rs = fs.createReadStream('./resources/mydoc.json'); 

var bulkExec = function (body, callback) { 
    console.log(body); 
    client.bulk({ 
    index: 'my_index', 
    type: 'my_type', 
    body: body 
    }, callback); 
}; 

var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; }); 


const done = (err, res) => { 
    if (err) { 
    console.log(err); 
    } 
    console.log(res); 
    console.log('go get a drink you deserve it'); 
}; 

var ws = new WritableBulk(bulkExec); 

rs.pipe(parse()) 
.pipe(es.mapSync(function (element) { 
    var a = []; 
    if (element.key === 'document') { 
    a = element.value; 
    return a; 
    } 
})) 
.pipe(toBulk) 
.pipe(ws).on('finish', done); 
+0

あなたの答えはここに入れてください。 – migg

関連する問題