0
ラムダ関数を使用してawsでs3バケットからmysql RDSインスタンスにデータを挿入しようとしています。私はsqlalchemyを使用してmysqlエンドポイントに接続しました。私はデータにいくつかの変更を加えたいと思います。列名を変更してから、RDSインスタンスのテーブルにマップできるようにインデックスを再作成しました。問題はdf.columns行にあります。文字列形式で列名を取得する代わりに、それらをタプルとして取得しています。以下awsで文字列の代わりにテーブルの列名を取得する
+-----------------+-------------+----------------------+---------------+---------
| ('col_a',) | ('date_timestamp',) | ('col_b',) | ('col_c',) | (vehicle_id',) |
+-----------------+-------------+----------------------+---------------+---------
| 0.180008333 | 2017-09-28T20:36:00Z | -6.1487501 | 38.35 | 1004 |
| 0.809708333 | 2017-06-17T14:16:00Z | 8.189424 | -6.8732784 | NominalValue |
+-----------------+-------------+----------------------+---------------+---------
コードである -
from __future__ import print_function
import boto3
import json
import logging
import pymysql
from sqlalchemy import create_engine
from pandas.io import sql
from pandas.io.json import json_normalize
from datetime import datetime
print('Loading function')
s3 = boto3.client('s3')
def getEngine(endpoint):
engine_ = None
try:
engine_ = create_engine(endpoint)
except Exception as e:
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
return engine_
engine = getEngine('mysql+pymysql://username:[email protected]/database')
configuration = {
"aTable":
{
"from" : ['col_1','col_2','date_timestamp','operator_id'],
"to" : ['date_timestamp','operator_id','col_1','col_2'],
"sql_table_name" : 'sql_table_a'
},
"bTable" : {
"from" : ['col_a','date_timestamp','col_b','col_c','vehicle_id'],
"to" : ['date_timestamp','col_a','col_b','vehicle_id','col_c'],
"sql_table_name" : 'sql_table_b'
}
}
def handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
obj = s3.get_object(Bucket=bucket, Key=s3_object_key)
data = json.loads(obj['Body'].read())
for _key in data:
if not _key in configuration:
print("No configuration found for {0}".format(_key))
df = json_normalize(data[str(_key)])
df.columns=[configuration[_key]['from']]
#df = df.reindex(indexlist,axis="columns")
#df['date_timestamp'] = df['date_timestamp'].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%SZ"))
df.to_sql(name=configuration[_key]['sql_table_name'], con=engine, if_exists='append', index=False)
print(df)
return "Loaded data in RDS"