2017-08-02 7 views
1

をDynamoDBのは?私はそれを毎分をトリガーするCloudWatchのを使用してラムダ関数を作ってみました。私はこの作業を完了するためにAWSの他のサービスを利用することに門戸を開いています。私はただ見落としているという簡単な説明があると確信しています。転送データがSQSから私が手にメッセージを転送し、DynamoDBのにそれらを介して送信する方法はあり

*私は私のコードに修正またはこれを達成するための別の解決策のいずれかを探しています、私のコードは動作しません編集します。

**編集作業それを得ました。

'use strict'; 
const AWS = require('aws-sdk'); 

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' }); 
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' }); 

const QUEUE_URL = 'SQS_URL'; 
const PROCESS_MESSAGE = 'process-message'; 
const DYNAMO_TABLE = 'TABLE_NAME'; 

function poll(functionName, callback) { 
    const params = { 
     QueueUrl: QUEUE_URL, 
     MaxNumberOfMessages: 10, 
     VisibilityTimeout: 10 
    }; 

    // batch request messages 
    SQS.receiveMessage(params, function(err, data) { 
     if (err) { 
      return callback(err); 
     } 

     // parse each message 
     data.Messages.forEach(parseSQSMessage); 
    }) 
    .promise() 
    .then(function(){ 
     return Lambda.invokeAsync({}) 
     .promise() 
     .then(function(data){ 
      console.log('Recursion'); 
     }) 
    } 
    ) 
    .then(function(){context.succeed()}).catch(function(err){context.fail(err, err.stack)}); 
} 

// send each event in message to dynamoDB. 
// remove message from queue 
function parseSQSMessage(msg, index, array) { 

    // delete SQS message 
    var params = { 
     QueueUrl: QUEUE_URL, 
     ReceiptHandle: msg.ReceiptHandle 
    }; 

    SQS.deleteMessage(params, function(err, data) { 
     if (err) console.log(err, err.stack); // an error occurred 
     else  console.log(data);   // successful response 
    }); 
} 

// store atomic event JSON directly to dynamoDB 
function storeEvent(event) { 
    var params = { 
     TableName : DYNAMO_TABLE, 
     Item: event 
    }; 

    var docClient = new AWS.DynamoDB.DocumentClient(); 

    docClient.put(params, function(err, data) { 
     if (err) console.log(err); 
     else console.log(data); 
    }); 
} 

exports.handler = (event, context, callback) => { 
    try { 
     // invoked by schedule 
     poll(context.functionName, callback); 
    } catch (err) { 
     callback(err); 
    } 
}; 
+0

SQSからメッセージを引き出し、DynamoDBのに保存したノードで書かれたAWSラムダ関数との関連githubのプロジェクト:https://github.com/leaflevellabs/aws-lambda-sqs-dynamodb。 – jarmod

+0

コードは機能しませんか?エラーメッセージが表示されますか?あなたの質問は何です - あなたはそれが可能だかどうか尋ねている、またはあなたのコードがない理由/動作しませんか?質問を編集して明確にしてください。 –

答えて

0
var aws = require("aws-sdk"); 


// get configuration defaults from config file. 
var tableName = 'Table_Name'; 
var queueUrl = 'SQS_URL'; 

var dbClient = new aws.DynamoDB.DocumentClient(); 
var sqsClient = new aws.SQS(); 



// get config values from dynamodb - if the config values are found, then override existing values 
// this will occur on every execution of the lambda which will allow real time configuration changes. 
var updateConfig = function updateConfigValues(invokedFunction, cb) { 

    var params = { 
      TableName: "Table_NAME", 

     Key: { 
      "KEY": "KEY" 
     } 

    }; 

    dbClient.get(params, function(err, data) { 

     if(err) { 
      console.log("ERR_DYNAMODB_GET", err, params); 
     } 
     else if(!data || !data.Item) { 
      console.log("INFO_DYNAMODB_NOCONFIG", params); 
     } 
     else { 
      queueUrl = data.Item.config.queueUrl; 
      tableName = data.Item.config.tableName; 
     } 

     return cb(err); 
    }); 

}; 

// save the email to dynamodb using conditional write to ignore addresses already in the db 
var saveEmail = function saveEmail(messageBody, cb) { 


    var params = { 
     TableName:tableName, 
     Item:messageBody, 
     ConditionExpression : "attribute_not_exists(clickId)", 
    }; 

    dbClient.put(params, function(err, data) { 
     cb(err, data); 
    }); 
}; 

var deleteMessage = function deleteMessage(receiptHandle, cb) { 

    var params = { 
     QueueUrl: queueUrl, 
     ReceiptHandle: receiptHandle 
    }; 

    sqsClient.deleteMessage(params, function(err, data) { 
     cb(err, data); 
    }); 

} 

exports.handler = function(event, context) { 

    updateConfig(context.invokedFunctionArn, function(err) { 

     if(err) { 
      context.done(err); 
      return; 
     } 

     console.log("INFO_LAMBDA_EVENT", event); 
     console.log("INFO_LAMBDA_CONTEXT", context); 

     sqsClient.receiveMessage({MaxNumberOfMessages:10 , QueueUrl: queueUrl}, function(err, data) { 

      if(err) { 
       console.log("ERR_SQS_RECEIVEMESSAGE", err); 
       context.done(null); 
      } 
      else { 

       if (data && data.Messages) { 


        console.log("INFO_SQS_RESULT", " message received"); 

         var message = JSON.parse(data.Messages[0].Body); 

         var messageBody = message.Message; 

         messageBody = JSON.parse(messageBody); 




         // loops though the messages and replaces any empty strings with "N/A" 
         messageBody.forEach((item) => { 
          var item = item; 
          var custom = item.customVariables; 
          for (i = 0; i < custom.length; i++) { 
           if(custom[i] === ''){ 
            custom[i] = 'N/A'; 
           } 
           item.customVariables = custom; 
          } 
          for(variable in item) { 

           if(item[variable] === ""){ 
            item[variable] = "N/A"; 
            console.log(item); 
           } 
          } 

          var messageBody = item; 
         }); 
         var messageBody = messageBody[0]; 
         // Logs out the new messageBody 
         console.log("FIXED - ", messageBody); 

         // Checks for errors and delets from que after sent 
         saveEmail(messageBody, function(err, data) { 

          if (err && err.code && err.code === "ConditionalCheckFailedException") { 
           console.error("INFO_DYNAMODB_SAVE", messageBody + " already subscribed"); 
           deleteMessage(message.MessageId, function(err) { 
            if(!err) { 
             console.error("INFO_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, "successful"); 
            } else { 
             console.error("ERR_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, err); 
            } 
            context.done(err); 
           }); 

          } 
          else if (err) { 
           console.error("ERR_DYNAMODB_SAVE", "receipt handle: " + message.MessageId, err); 
           context.done(err); 
          } 
          else { 
           console.log("INFO_DYNAMODB_SAVE", "email_saved", "receipt handle: " + message.MessageId, messageBody.Message); 
           deleteMessage(message.MessageId, function(err) { 
            if(!err) { 
             console.error("INFO_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, "successful"); 
            } else { 
             console.error("ERR_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, err); 
            } 
            context.done(err); 
           }); 
          } 


         }); 
       } else { 
        console.log("INFO_SQS_RESULT", "0 messages received"); 
        context.done(null); 
       } 
      } 
     }); 
    }); 

} 
関連する問題