2017-12-06 19 views
0

ラムダ関数を使用してawsでs3バケットからmysql RDSインスタンスにデータを挿入しようとしています。私はsqlalchemyを使用してmysqlエンドポイントに接続しました。私はデータにいくつかの変更を加えたいと思います。列名を変更してから、RDSインスタンスのテーブルにマップできるようにインデックスを再作成しました。問題はdf.columns行にあります。文字列形式で列名を取得する代わりに、それらをタプルとして取得しています。以下awsで文字列の代わりにテーブルの列名を取得する

+-----------------+-------------+----------------------+---------------+--------- 
| ('col_a',) | ('date_timestamp',) | ('col_b',) | ('col_c',) | (vehicle_id',) | 
+-----------------+-------------+----------------------+---------------+--------- 
| 0.180008333 | 2017-09-28T20:36:00Z | -6.1487501 | 38.35  |  1004  |   
| 0.809708333 | 2017-06-17T14:16:00Z | 8.189424 | -6.8732784 | NominalValue | 
+-----------------+-------------+----------------------+---------------+--------- 

コードである -

from __future__ import print_function 
import boto3 
import json 
import logging 
import pymysql 
from sqlalchemy import create_engine 
from pandas.io import sql 
from pandas.io.json import json_normalize 
from datetime import datetime 
print('Loading function') 

s3 = boto3.client('s3') 
def getEngine(endpoint): 
    engine_ = None 
    try: 
     engine_ = create_engine(endpoint) 
    except Exception as e: 
     print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) 
     raise e 
    return engine_ 
engine = getEngine('mysql+pymysql://username:[email protected]/database') 

configuration = { 
    "aTable": 
    { 
     "from" : ['col_1','col_2','date_timestamp','operator_id'], 
     "to" : ['date_timestamp','operator_id','col_1','col_2'], 
     "sql_table_name" : 'sql_table_a' 
    }, 
    "bTable" : { 
     "from" : ['col_a','date_timestamp','col_b','col_c','vehicle_id'], 
     "to" : ['date_timestamp','col_a','col_b','vehicle_id','col_c'], 
     "sql_table_name" : 'sql_table_b' 
    } 
} 

def handler(event, context): 
    bucket = event['Records'][0]['s3']['bucket']['name'] 
    s3_object_key = event['Records'][0]['s3']['object']['key'] 
    obj = s3.get_object(Bucket=bucket, Key=s3_object_key) 
    data = json.loads(obj['Body'].read()) 
    for _key in data: 
     if not _key in configuration: 
      print("No configuration found for {0}".format(_key)) 
     df = json_normalize(data[str(_key)]) 
     df.columns=[configuration[_key]['from']] 
     #df = df.reindex(indexlist,axis="columns") 
     #df['date_timestamp'] = df['date_timestamp'].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%SZ")) 
     df.to_sql(name=configuration[_key]['sql_table_name'], con=engine, if_exists='append', index=False) 
    print(df) 
    return "Loaded data in RDS" 

答えて

0

我々は、行のコードから[]を削除してください -

df.columns=[configuration[_key]['from']] 

正しいコードは

df.columns=configuration[_key]['from'] 
あります
関連する問題