2017-02-17 16 views
13

我々は、約500Kの要素で(xml-streamを使用して)XMLファイルを読み込むと、このようにMongoDBに挿入して行うのです:node.jsの切断時にMongoDBの挿入をバッファリングするには?

xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

挿入writeDataToDb(type, obj)には、次のようになります

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

モンゴ接続が切断され、xmlストリームが読み込まれ、コンソールにエラーメッセージが表示されます(挿入できない、切断された、EPIPEが壊れている、など)。

docsではそれは言う:あなたはのmongodプロセスをシャットダウン

、ドライバが処理操作を停止し、原因であること-1すべての操作のバッファを意味し、デフォルトでbufferMaxEntriesにそれらをバッファリングし続けます。

このバッファは実際に何をしていますか?

データを挿入してmongoサーバーを閉じると、バッファが取得され、次にmongoサーバーがバックアップされ、ネイティブドライバーは正常に再接続し、ノードはデータの挿入を再開しますが、 )が再び挿入されることはありません。

私はこのバッファとその使用方法に疑問を持ちます。

目標:

我々はMongoのは(wtimeoutによる15000millisecondsに)戻ってくるまで、バッファ内のインサートを維持するための最良の方法を探して、その後、バッファリングされた文書を挿入したり、xml.pause();xml.resume()を活用してみましょうされています私たちは成功しなかった。

基本的には、データの消失や割り込みなしで切断を処理する方法について少し助けが必要です。

+0

、XML-stream'は月に一度、バッファオブジェクトを挿入し、 '使用してドキュメントやテストの例の両方を、これを複製することはできませんgoサーバがバックアップされています。もっとコードを投稿したり、あなたの設定に関するいくつかの情報をもっと与えることはできますか? – cviejo

+0

@cviejoスクリプトは会社に関連しているので共有できませんが、これを複製しようとしたスクリプトを送ってもらえますか?要点/ペーストビンは大丈夫でしょう。 – DanFromGermany

答えて

1

私は特にMongodbドライバとこのエントリのバッファについては分かりません。たぶん特定のシナリオでデータを保持するだけかもしれません。

私はこの質問に、どのデータベースでも使用できるより一般的な方法で答えていきます。

を使用すると、2つの問題を抱えて、要約すると:
  • XMLストリームが

  • は、最初の問題を処理するためにあまりにも速くデータを送信

    1. あなたは失敗から回復していないが、あなたが実装する必要がありますあきらめる前に多くの試みが行われるようにする再試行アルゴリズム。

      第2の問題を処理するには、xmlストリームに背圧をかける必要があります。 pauseメソッド、resumeメソッド、および入力バッファを使用してこれを行うことができます。

      var Promise = require('bluebird'); 
      var fs = require('fs'); 
      var Xml = require('xml-stream'); 
      
      var fileStream = fs.createReadStream('myFile.xml'); 
      var xml = new Xml(fileStream); 
      
      // simple exponential retry algorithm based on promises 
      function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { 
          var delay = initialDelay; 
          var retry = 0; 
          var closure = function() { 
           return task().catch(function(error) { 
            retry++; 
            if (retry > maxRetry) { 
             throw error 
            } 
            var promise = Promise.delay(delay).then(closure); 
            delay = Math.min(delay * 2, maxDelay); 
            return promise; 
           }) 
          }; 
          return closure(); 
      } 
      
      var maxPressure = 100; 
      var currentPressure = 0; 
      var suspended = false; 
      var stopped = false; 
      var buffer = []; 
      
      // handle back pressure by storing incoming tasks in the buffer 
      // pause the xml stream as soon as we have enough tasks to work on 
      // resume it when the buffer is empty 
      function writeXmlDataWithBackPressure(product) { 
          // closure used to try to start a task 
          var tryStartTask = function() { 
           // if we have enough tasks running, pause the xml stream 
           if (!stopped && !suspended && currentPressure >= maxPressure) { 
            xml.pause(); 
            suspended = true; 
            console.log("stream paused"); 
           } 
           // if we have room to run tasks 
           if (currentPressure < maxPressure) { 
            // if we have a buffered task, start it 
            // if not, resume the xml stream 
            if (buffer.length > 0) { 
             buffer.shift()(); 
            } else if (!stopped) { 
             try { 
              xml.resume(); 
              suspended = false; 
              console.log("stream resumed"); 
             } catch (e) { 
              // the only way to know if you've reached the end of the stream 
              // xml.on('end') can be triggered BEFORE all handlers are called 
              // probably a bug of xml-stream 
              stopped = true; 
              console.log("stream end"); 
             } 
            } 
           } 
          }; 
      
          // push the task to the buffer 
          buffer.push(function() { 
           currentPressure++; 
           // use exponential retry to ensure we will try this operation 100 times before giving up 
           exponentialRetry(function() { 
            return writeDataToDb(product) 
           }, 100, 2000, 100).finally(function() { 
            currentPressure--; 
            // a task has just finished, let's try to run a new one 
            tryStartTask(); 
           }); 
          }); 
      
          // we've just buffered a task, let's try to run it 
          tryStartTask(); 
      } 
      
      // write the product to database here :) 
      function writeDataToDb(product) { 
          // the following code is here to create random delays and random failures (just for testing) 
          var timeToWrite = Math.random() * 100; 
          var failure = Math.random() > 0.5; 
          return Promise.delay(timeToWrite).then(function() { 
           if (failure) { 
            throw new Error(); 
           } 
           return null; 
          }) 
      } 
      
      xml.on('endElement: product', writeXmlDataWithBackPressure); 
      

      再生方法については、console.logを入力してください。 これはあなたの問題を解決するのに役立つことを願っています:)

    +0

    これは基本的には良い実装ですが、私は内部のwrite concern/mongoの書き込みバッファを利用できるようにしたいと考えています - [このページ](https://mongodb.github.io/node-mongodb- native/driver-articles/anintroductionto1_4_and_2_6.html)とキーワード 'bufferMaxEntries'があります。 – DanFromGermany

    2

    insertOne()で500K要素を挿入することは非常に悪い考えです。代わりに、bulk operationsを使用して、1回の要求で多くの文書を挿入できるようにする必要があります。 (ここでは例10000のために、それが50回の単一の要求で行うことができます) をバッファリングの問題を回避するには、手動で処理することができます:

    1. 無効にバッファリングの再接続のプロパティを設定し
    2. bufferMaxEntries: 0で:reconnectTries: 30, reconnectInterval: 1000
    3. bulkOperationを作成し、それを10000個のアイテムでフィードします。
    4. xmlリーダーを一時停止します。 10000アイテムを挿入してみてください。それが失敗した場合、それは
    5. を成功するまで、あなたは一括操作が実行中に中断された場合、一部重複IDの問題に直面しているので、(エラーコード:11000)、それらを無視して、すべての3000msを再試行してください

    ここではサンプルスクリプトは次のとおりです。

    var fs = require('fs') 
    var Xml = require('xml-stream') 
    
    var MongoClient = require('mongodb').MongoClient 
    var url = 'mongodb://localhost:27017/test' 
    
    MongoClient.connect(url, { 
        reconnectTries: 30, 
        reconnectInterval: 1000, 
        bufferMaxEntries: 0 
    }, function (err, db) { 
        if (err != null) { 
        console.log('connect error: ' + err) 
        } else { 
        var collection = db.collection('product') 
        var bulk = collection.initializeUnorderedBulkOp() 
        var totalSize = 500001 
        var size = 0 
    
        var fileStream = fs.createReadStream('data.xml') 
        var xml = new Xml(fileStream) 
        xml.on('endElement: product', function (product) { 
         bulk.insert(product) 
         size++ 
         // if we have enough product, save them using bulk insert 
         if (size % 10000 == 0) { 
         xml.pause() 
         bulk.execute(function (err, result) { 
          if (err == null) { 
          bulk = collection.initializeUnorderedBulkOp() 
          console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') 
          xml.resume() 
          } else { 
          console.log('bulk insert failed: ' + err) 
          counter = 0 
          var retryInsert = setInterval(function() { 
           counter++ 
           bulk.execute(function (err, result) { 
           if (err == null) { 
            clearInterval(retryInsert) 
            bulk = collection.initializeUnorderedBulkOp() 
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
            xml.resume() 
           } else if (err.code === 11000) { // ignore duplicate ID error 
            clearInterval(retryInsert) 
            bulk = collection.initializeUnorderedBulkOp() 
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
            xml.resume() 
           } else { 
            console.log('failed after first try: ' + counter, 'error: ' + err) 
           } 
           }) 
          }, 3000) // retry every 3000ms until success 
          } 
         }) 
         } else if (size === totalSize) { 
         bulk.execute(function (err, result) { 
          if (err == null) { 
          db.close() 
          } else { 
          console.log('bulk insert failed: ' + err) 
          } 
         }) 
         } 
        }) 
        } 
    }) 
    

    サンプルログ出力:

    doc 0 : 10000 saved on first try 
    doc 10000 : 20000 saved on first try 
    doc 20000 : 30000 saved on first try 
    [...] 
    bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown 
    failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 
    failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 
    failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 
    doc 130000 : 140000 saved after 4 tries 
    doc 140000 : 150000 saved on first try 
    [...] 
    
    +0

    あなたの回答はmongo書き込みバッファに関する情報を提供せず、レプリカセットまたは切断時の変更時にもすべてのドキュメントを挿入する方法に関する解決法はありません。一括挿入についての情報は興味深いものです。私はそれを見ていきます。ありがとう! – DanFromGermany

    +0

    @DanFromGermanyはい、間違った問題を解決しようとしているように見えます。実際の問題は、アプリがデータベースから切断されるということです。データベースへの呼び出しが少なくなると、自動再接続が容易になり、書き込みバッファリングが不要になります – felix

    +0

    私のアプリケーション**はデータベースから切断されません**。 **再接続してすべてのデータを書き込むために、レプリカセットアサートの** disconnects *または* master-switchesの場合**のアプリケーションを書いてみたい。 – DanFromGermany

    関連する問題