2016-11-11 14 views
0

私はDynamoからMysql DBにデータを移行するためのスクリプトを作成しました。 最初はAsyncを使用していませんでしたが、SQL側でボトルネックが発生し始めたので、非同期libを使用してdymanoの部分を「スロットル」することにしました。 問題:ダイナモにデータがある限り、途中で再帰があります(超単純なETL)。しかし、滝の内部で再帰を実行する方法はわかりません。 マイコード:再帰が起こる場所非同期でのwaterfallとnodejsの再帰

function main() { 
    async.waterfall([getMaxTimestamp, scanDynamoDB, printout, saveToMySQL], function(err, result) { 
     if(err) console.log(err) 
     console.log(result) 
    }); 
} 

function getMaxTimestamp(callback) { 
    console.time("max query"); 
    connection.query("SELECT MAX(created_at) as start_date from Tracking;", function(err, data) { 
     console.timeEnd("max query"); 
     callback(err, data); 
    }) 
} 

function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 
    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     callback(err, data); 
     // if (!err) { 
     //  if (data != undefined && data.Count > 0) { 
     //   printout(data.Items) // Print out the subset of results. 
     //   if (data.LastEvaluatedKey) { // Result is incomplete; there is more to come. 
     //    query.ExclusiveStartKey = data.LastEvaluatedKey; 
     //    scanDynamoDB(query); 
     //   } 
     //  } else { 
     //   console.log('No fresh data found on Dynamo') 
     // } else console.dir(err); 
    }); 
}; 

function assembleSql() { 
    insertSql = "insert into Tracking ("; 
    for (var i = 0; i < headers.length; i++) { 
     insertSql += headers[i]; 
     if (i < headers.length - 1) 
      insertSql += ","; 
    } 

    insertSql += ") values ?;" 
    previousInsertSql = insertSql; 
} 

function saveToMySQL(items, callback) { 
    assembleSql(); 
    //connection.connect(); 
    console.time("insert sql") 
    connection.query(insertSql, [items], function(err, result) { 
     console.timeEnd("insert sql") 
     if (err){ 
      callback(err, null) 
      return; 
     } 

     totalInserts += result.affectedRows; 
     callback(err, totalInserts) 
     //connection.end(); 
    }) 
} 

function printout(items, callback) { 
    var headersMap = {}; 
    var values; 
    var header; 
    var value; 

    var out = []; 

    if (headers.length == 0) { 
     if (items.length > 0) { 
      for (var i = 0; i < items.length; i++) { 
       for (var key in items[i]) { 
        headersMap[key] = true; 
       } 
      } 
     } 
     for (var key in headersMap) { 
      headers.push(key); 
     } 
    } 

    for (index in items) { 
     values = []; 
     for (i = 0; i < headers.length; i++) { 
      value = ""; 
      header = headers[i]; 
      // Loop through the header rows, adding values if they exist 
      if (items[index].hasOwnProperty(header)) { 
       if (items[index][header].N) { 
        value = items[index][header].N; 
       } else if (items[index][header].S) { 
        value = items[index][header].S; 
       } else if (items[index][header].SS) { 
        value = items[index][header].SS.toString(); 
       } else if (items[index][header].NS) { 
        value = items[index][header].NS.toString(); 
       } else if (items[index][header].B) { 
        value = items[index][header].B.toString('base64'); 
       } else if (items[index][header].M) { 
        value = JSON.stringify(items[index][header].M); 
       } else if (items[index][header].L) { 
        value = JSON.stringify(items[index][header].L); 
       } else if (items[index][header].BOOL !== undefined) { 
        value = items[index][header].BOOL.toString(); 
       } 
      } 
      values.push(value) 
     } 
     out.push(values) 
    } 
    callback(null, out); 
} 
main(); 

コメント部分があるが、私はどこ私の流れの内側にこれを配置する場所を知りません!

助けていただけたら幸いです!

答えて

0

以下のように表示されていない状態で、再帰的に呼び出すことができます。

async.whilst(function() { return canInsert}, function (callback){ 
      scanDynamoDB(query, callback) 
     }, function(err, res) {} 
function scanDynamoDB(data, callback) { 
    console.time("dynamo read"); 

    dynamoDB.scan(query, function(err, data) { 
     console.timeEnd("dynamo read"); 
     if (!err) { 
      if (data != undefined && data.Count > 0) { 
       canInsert = data.LastEvaluatedKey; 
       if (data.LastEvaluatedKey) // Result is incomplete; there is more to come. 
        query.ExclusiveStartKey = data.LastEvaluatedKey; 
      } 
     } else console.dir(err); 
    }); 
}; 
私はちょうど while(canInsert)でそれを行っている可能性

。とにかく、私は再帰を避け、メモリ使用量は方法の方が低いです。

0

データを取得中にscanDynamoDBのコールバック関数をコールしないでください。あなたは、追加の機能を実装し、エラーが実際に私が自分でそれを把握することができた

function scanDynamoDB(data, callback) { 
    if (data[0].start_date != null && data[0].start_date) 
     query.ExpressionAttributeValues[':v_ca'].N = data[0].start_date; 

    console.time("dynamo read"); 

    var result = []; // for accumulate data of each query 

    function readNext(err, data) { 
     if (err) 
      return callback(err); 

     if (!data || !data.Count) 
      return callback(null, result); 

     // add data to result 

     dynamoDB.scan(query, readNext); 
    } 

    dynamoDB.scan(query, readNext); 
}; 
+0

私はここで間違っているかもしれませんが、そうすることで、私は滝の鎖をカットします、そうですか?つまり、すべてのデータを読み込んだら、引き続きmysqlに保存します。そうですか?そして助けてくれてありがとう! – Leonardo

+0

はい、waterfallはすべてのデータを待機します。プロセスデータをパートごとに処理する必要がある場合は、 'readNext'がそれを行う必要があります。 –

+0

ええ、それはどうしたのですか? readNextを使って?ちょっと新しいasync – Leonardo