2016-08-03 12 views
0

各テストがクリアされる前にデータをテーブルにロードする一連のユニットテスト(node.js 4.x、aws-sdk、mocha)を実行していますテスト後のテーブル。DynamoDB javascript SDK batchWriteItemは書き込み容量を増やさないと完了しません

ConditionCheckFailedExceptionをトリガーするConditionExpressionが原因で2つのテストに失敗しました。しかし、私が読み書き可能な容量を増やすと、テストは合格になります。

SDKはスロットルの例外を処理して再試行するので、なぜ私のテストは遅く実行されないのですか?代わりに、テストがscan - >batchWriteItemのプロセスを完了できないように見えるので、新しいテストが開始されたときにテーブルにまだレコードが残っています。

私はチームメンバーから、同様の問題を抱えていると言われており、問題を解決するためにスループットが向上しました。これは私と一緒に座っていません。私は何かが間違っていると私のテストで競合状態があるか、または抑制されたときに私の操作が完了するように実装できるパターンがあるはずですか?スループットを増やす必要があるときには、スロットルメトリックを使用して通知する必要がありますが、メモリが不足するまで再試行を続けることができます。

他に誰かがこの問題にぶつかり、問題を処理するために何をしましたか?

答えて

0

デバッグ後、私はUnprocessedItems応答要素に気付きました。調べた後、UnprocessedItemsin the docs私はもっと詳しく読んだはずです。以下のコードは、遅延(指数バックオフ)を伴う再試行ループを実行します:

var clearEventTable = function (tableName, client, cleared) { 
    var exclusiveStartKey = null; 
    var retryCount = 0; 

    var read = function(query, callback) { 
    client.scan(query, function (err, page) { 
     if(err) { 
     console.log(err); 
     return callback(err); 
     } 

     retryCount = 0; 
     exclusiveStartKey = page.LastEvaluatedKey || null; 
     if(page.Count == 0) { 
     return callback(null, {}); 
     } 

     if(page.Count < 25 && exclusiveStartKey) { 
     console.log("read capacity limit reached: " + JSON.stringify(page, null, 2)); 
     } 

     var keys = _.map(page.Items, function(n) { 
     return { DeleteRequest: { Key: n } }; 
     }); 

     var batch = { 
     RequestItems: {}, 
     ReturnConsumedCapacity: "INDEXES", 
     ReturnItemCollectionMetrics: "SIZE" 
     }; 

     batch.RequestItems[tableName] = keys; 

     callback(null, batch); 
    }); 
    }; 

    var write = function(batch, callback) { 
    if(batch && batch.RequestItems){ 
     client.batchWriteItem(batch, function(err, result) { 
     if(err) { 
      console.log(err); 
      return callback(err); 
     } 

     if(Object.keys(result.UnprocessedItems).length !== 0) { 
      console.log("Retry batchWriteItem: " + JSON.stringify(result, null, 2)); 
      retryCount++; 
      var retry = { 
      RequestItems: result.UnprocessedItems, 
      ReturnConsumedCapacity: "INDEXES", 
      ReturnItemCollectionMetrics: "SIZE" 
      }; 
      // retry with exponential backoff 
      var delay = retryCount > 0 ? (50 * Math.pow(2, retryCount - 1)) : 0; 
      setTimeout(write(retry, callback), delay); 
      return; 
     } 

     callback(null, result); 
     }); 
    } else { 
     callback(null); 
    } 
    }; 

    var params = { 
    TableName: tableName, 
    ProjectionExpression: "aggregateId,id", 
    Limit: 25, // max 25 per batchWriteItem 
    ConsistentRead: false, 
    ReturnConsumedCapacity: "TOTAL" 
    }; 

    async.doWhilst(function (next) { 
    // retrieve entities 
    if (exclusiveStartKey) 
     params.ExclusiveStartKey = exclusiveStartKey; 

    async.compose(write, read)(params, function (err, result) { 
     if (err) next(err); 
     else next(null, result); 
    }); 
    }, function() { 
    // test if we need to load more 
    return exclusiveStartKey !== null; 
    }, function (err, r) { 
    // return results 
    if (err) { 
     console.log(err); 
     return cleared(err); 
    } 
    return cleared(null);; 
    }); 
}; 
関連する問題