2017-11-29 22 views
3

私は時間ごとに実行されるスパークバッチジョブを持っています。各実行は、S3に新しいデータを生成して格納し、ディレクトリの名前付けパターンはDATA/YEAR=?/MONTH=?/DATE=?/datafileです。AWSでMSCK REPAIR TABLEを自動的に実行する方法Athena

S3にデータをアップロードした後、Athenaを使用して調査します。さらに、データソースとしてAthenaに接続することで、それらをQuickSightに視覚化したいと思います。

問題は、私が手動でクエリMSCK REPARI TABLEを実行しないかぎり、私のSparkバッチを実行するたびに、S3に保存された新しく生成されたデータはアテナでは検出されません。

アテナが自動的にデータを更新する方法はありますか?完全自動データビジュアライゼーションパイプラインを作成する方法はありますか?

答えて

3

このタスクをスケジュールする方法はいくつかあります。ワークフローをどのようにスケジュールするのですか? AirflowLuigiAzkaban、cronのようなシステムを使用していますか、AWS Data pipelineを使用していますか?

これらのいずれかから、次のCLIコマンドを起動できます。

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

別のオプションはAWS Lambdaだろう。あなたは、S3への新しいアップロードに応じてMSCK REPAIR TABLE some_database.some_tableを呼び出す関数を持つことができます。

例ラムダ関数は、次のような記述できます。

import boto3 

def lambda_handler(event, context): 
    bucket_name = 'some_bucket' 

    client = boto3.client('athena') 

    config = { 
     'OutputLocation': 's3://' + bucket_name + '/', 
     'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'} 

    } 

    # Query Execution Parameters 
    sql = 'MSCK REPAIR TABLE some_database.some_table' 
    context = {'Database': 'some_database'} 

    client.start_query_execution(QueryString = sql, 
           QueryExecutionContext = context, 
           ResultConfiguration = config) 

新しいデータがあなたのバケット内DATA/接頭辞の下に追加されたときあなたは、あなたのラムダ関数を実行するトリガーを構成します。

最終的に、ジョブスケジューラを使用してスパークジョブを実行した後でパーティションを明示的に再構築すると、自己文書化の利点があります。一方、AWS Lambdaはこのような仕事に便利です。