2017-08-05 5 views
1

MongoDBサーバーをクラッシュさせることなく、多くのデータを効率的に挿入しようとしています(XMLファイルのサイズが70GBを超えています)。現在、これは私がNodeJSでxml-streamを使用してやっているものです:私はxml.on()を呼び出すときストリーミングされたXMLデータデータベースを挿入

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    assert = require('assert'), 
    ObjectId = require('mongodb').ObjectID, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
var xml = new XmlStream(stream); 

xml.collect('ns:Statistik'); 
xml.on('endElement: ns:Statistik', function(item) { 
    var insertDocument = function(db, callback) { 
     db.collection('vehicles').insertOne(item, function(err, result) { 
      amount++; 
      if (amount % 1000 == 0) { 
       console.log("Inserted", amount); 
      } 
      callback(); 
     }); 
    }; 

    MongoClient.connect(url, function(err, db) { 
     insertDocument(db, function() { 
      db.close(); 
     }); 
    }); 
}); 

それは基本的に私は、現在の午前ツリー/要素を返します。これはまっすぐなJSONなので、私はdb.collection().insertOne()関数にパラメータとして渡すことができ、それをデータベースに挿入します。

すべてのコードは実際と同じように動作しますが、約3000回の挿入(約10秒かかる)後に停止します。データベース接続を開いてデータを挿入してから、XMLファイルのツリーが表示されるたびに接続を閉じる(この場合は約3000回)と思われます。

何とかinsertMany()機能を組み込み、100s(またはそれ以上)のチャンクにすることはできますが、ストリームと非同期でどのように動作するかはわかりません。

私の質問は次のとおりです.MongoDBデータベースに多量のXMLをJSONに挿入するにはどうすればクラッシュするのですか?

答えて

1

.insertMany()は毎回書くよりも良いと思われるので、実際には"stream"のデータを収集するだけです。

実行が「非同期」であるので、あなたは通常ので、通常あなた.pause()"stream"コールバックが完了したら、.insertMany()、その後.resume()を呼び出す前に、スタック上であまりにも多くのアクティブコールを持つ避けたい:

var fs = require('fs'), 
    path = require('path'), 
    XmlStream = require('xml-stream'), 
    MongoClient = require('mongodb').MongoClient, 
    url = 'mongodb://username:[email protected]:27017/mydatabase', 
    amount = 0; 

MongoClient.connect(url, function(err, db) { 

    var stream = fs.createReadStream(path.join(__dirname, 'motor.xml')); 
    var xml = new XmlStream(stream); 

    var docs = []; 
    //xml.collect('ns:Statistik'); 

    // This is your event for the element matches 
    xml.on('endElement: ns:Statistik', function(item) { 
     docs.push(item);   // collect to array for insertMany 
     amount++; 

     if (amount % 1000 === 0) { 
      xml.pause();    // pause the stream events 
      db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      docs = [];    // clear the array 
      xml.resume();   // resume the stream events 
      }); 
     } 
    }); 

    // End stream handler - insert remaining and close connection 
    xml.on("end",function() { 
     if (amount % 1000 !== 0) { 
     db.collection('vehicles').insertMany(docs, function(err, result) { 
      if (err) throw err; 
      db.close(); 
     }); 
     } else { 
     db.close(); 
     } 
    }); 

}); 

あるいはややそれを近代化:

const fs = require('fs'), 
     path = require('path'), 
     XmlStream = require('xml-stream'), 
     MongoClient = require('mongodb').MongoClient; 

const uri = 'mongodb://username:[email protected]:27017/mydatabase'; 

(async function() { 

    let amount = 0, 
     docs = [], 
     db; 

    try { 

    db = await MongoClient.connect(uri); 

    const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')), 
      xml = new XmlStream(stream); 

    await Promise((resolve,reject) => { 
     xml.on('endElement: ns:Statistik', async (item) => { 
     docs.push(item); 
     amount++; 

     if (amount % 1000 === 0) { 
      try { 
      xml.pause(); 
      await db.collection('vehicle').insertMany(docs); 
      docs = []; 
      xml.resume(); 
      } catch(e) { 
      reject(e) 
      } 
     } 

     }); 

     xml.on('end',resolve); 

     xml.on('error',reject); 
    }); 

    if (amount % 1000 !== 0) { 
     await db.collection('vehicle').insertMany(docs); 
    } 

    } catch(e) { 
    console.error(e); 
    } finally { 
    db.close(); 
    } 

})(); 

注意をMongoClient接続は、実際に他のすべての操作をラップしていること。 を一度だけに接続し、その他の操作は"stream"のイベントハンドラで行われます。

したがって、XMLStreamの場合、イベントハンドラが式一致でトリガされ、データが抽出されて配列に収集されます。 1000回ごとに.insertMany()が呼び出され、文書が挿入され、「非同期」呼び出しで「一時停止」および「再開」されます。

完了すると、「終了」イベントが"stream"で発生します。ここでデータベース接続を閉じ、イベントループを解放してプログラムを終了します。

さまざまな.insertMany()コールを一度に(通常は「プールされたサイズ」にコールスタックをオーバーランしないように)許可することによって、ある程度の「並列性」を得ることができますが、これは基本的にプロセスがどのように見えるかです他の非同期I/Oが完了するのを待っている間、単純に一時停止するだけで、最も単純な形式で実行できます。

NOTE:これは必要ではないと思われるfollow up questionごとにあなたの元のコードから.collect()方法をコメントアウトし、実際には本当にデータベースへの各書き込みの後に廃棄すべきメモリ内のノードを保持されます。

+0

ああ、男の子、それは動作するように見えます!私は基本的に自分のやることをやっていますが、オープンなつながりで周りを包み込むことはできませんでした。私の問題は、それは私に非常に矛盾した結果をもたらしたということでした。 1000レコードを挿入した場合、実際には300レコードしかデータベースに表示されません(その周りに)。おそらく、私はそれが終わる前にランダムな時間に接続を閉じるだけだからです。ありがとう、ニール! – MortenMoulder

+0

別の注記:本当に始まる理由を知っていますか?約75000のインサートの後に遅くなりますか?私たちは、データベースが空のときは1000 /秒、75000前後のときは100〜200 /秒を話しています。 – MortenMoulder

+0

@MortenMoulder '.insertMany()'を使って改善が見られるはずですが、スループットはデータの量によって異なりますが、まったく違った、本当に広いテーマです。索引が存在する場合、使用可能なメモリー、書き込みの分散、および基本ハードウェアなど、詳細なしに検討する要素は多すぎます。他にご質問がある場合は、[詳細を明記できる] [https://stackoverflow.com/questions/ask]にお問い合わせください。 –