2016-11-11 14 views

私はDynamoからMysql DBにデータを移行するためのスクリプトを作成しました。 最初はAsyncを使用していませんでしたが、SQL側でボトルネックが発生し始めたので、非同期libを使用してdymanoの部分を「スロットル」することにしました。 問題:ダイナモにデータがある限り、途中で再帰があります(超単純なETL)。しかし、滝の内部で再帰を実行する方法はわかりません。 マイコード:再帰が起こる場所非同期でのwaterfallとnodejsの再帰

function main() { 
    async.waterfall([getMaxTimestamp, scanDynamoDB, printout, saveToMySQL], function(err, result) { 
     if(err) console.log(err) 

function getMaxTimestamp(callback) { 
    console.time("max query"); 
    connection.query("SELECT MAX(created_at) as start_date from Tracking;", function(err, data) { 
     console.timeEnd("max query"); 
     callback(err, data); 

function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 
    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     callback(err, data); 
     // if (!err) { 
     //  if (data != undefined && data.Count > 0) { 
     //   printout(data.Items) // Print out the subset of results. 
     //   if (data.LastEvaluatedKey) { // Result is incomplete; there is more to come. 
     //    query.ExclusiveStartKey = data.LastEvaluatedKey; 
     //    scanDynamoDB(query); 
     //   } 
     //  } else { 
     //   console.log('No fresh data found on Dynamo') 
     // } else console.dir(err); 

function assembleSql() { 
    insertSql = "insert into Tracking ("; 
    for (var i = 0; i < headers.length; i++) { 
     insertSql += headers[i]; 
     if (i < headers.length - 1) 
      insertSql += ","; 

    insertSql += ") values ?;" 
    previousInsertSql = insertSql; 

function saveToMySQL(items, callback) { 
    console.time("insert sql") 
    connection.query(insertSql, [items], function(err, result) { 
     console.timeEnd("insert sql") 
     if (err){ 
      callback(err, null) 

     totalInserts += result.affectedRows; 
     callback(err, totalInserts) 

function printout(items, callback) { 
    var headersMap = {}; 
    var values; 
    var header; 
    var value; 

    var out = []; 

    if (headers.length == 0) { 
     if (items.length > 0) { 
      for (var i = 0; i < items.length; i++) { 
       for (var key in items[i]) { 
        headersMap[key] = true; 
     for (var key in headersMap) { 

    for (index in items) { 
     values = []; 
     for (i = 0; i < headers.length; i++) { 
      value = ""; 
      header = headers[i]; 
      // Loop through the header rows, adding values if they exist 
      if (items[index].hasOwnProperty(header)) { 
       if (items[index][header].N) { 
        value = items[index][header].N; 
       } else if (items[index][header].S) { 
        value = items[index][header].S; 
       } else if (items[index][header].SS) { 
        value = items[index][header].SS.toString(); 
       } else if (items[index][header].NS) { 
        value = items[index][header].NS.toString(); 
       } else if (items[index][header].B) { 
        value = items[index][header].B.toString('base64'); 
       } else if (items[index][header].M) { 
        value = JSON.stringify(items[index][header].M); 
       } else if (items[index][header].L) { 
        value = JSON.stringify(items[index][header].L); 
       } else if (items[index][header].BOOL !== undefined) { 
        value = items[index][header].BOOL.toString(); 
    callback(null, out); 






async.whilst(function() { return canInsert}, function (callback){ 
      scanDynamoDB(query, callback) 
     }, function(err, res) {} 
function scanDynamoDB(data, callback) { 
    console.time("dynamo read"); 

    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     if (!err) { 
      if (data != undefined && data.Count > 0) { 
       canInsert = data.LastEvaluatedKey; 
       if (data.LastEvaluatedKey) // Result is incomplete; there is more to come. 
        query.ExclusiveStartKey = data.LastEvaluatedKey; 
     } else console.dir(err); 
私はちょうど while(canInsert)でそれを行っている可能性




function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 

    var result = []; // for accumulate data of each query 

    function readNext(err, data) { 
     if (err) 
      return callback(err); 

     if (!data || !data.Count) 
      return callback(null, result); 

     // add data to result 

     dynamoDB.scan(query, readNext); 

    dynamoDB.scan(query, readNext); 

私はここで間違っているかもしれませんが、そうすることで、私は滝の鎖をカットします、そうですか?つまり、すべてのデータを読み込んだら、引き続きmysqlに保存します。そうですか?そして助けてくれてありがとう! – Leonardo


はい、waterfallはすべてのデータを待機します。プロセスデータをパートごとに処理する必要がある場合は、 'readNext'がそれを行う必要があります。 –


ええ、それはどうしたのですか? readNextを使って?ちょっと新しいasync – Leonardo