2016-11-22 3 views
2

Python関数のdefault_args start_dateを参照することは可能ですか?エアフローETLパイプライン - 機能のスケジュール日付を使用していますか?

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2016, 11, 21), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=1) 
} 

私のPythonスクリプトは、主にこの文を発行するために、サブプロセスを使用しています。

query = '"SELECT * FROM {}.dbo.{} WHERE row_date = \'{}\'"'.format(database,                select_database(database)[table_int], 
                     query_date) 
command = 'BCP {} queryout \"{}\" -t, -c -a 10240 -S "server" -T'.format(query, os.path.join(path, filename)) 

私が実行したいタスクを照会するために、BCPを使用している「を選択*テーブルからどこ日付= {}」。現在、私のPythonスクリプトには日付変数のロジックがすべて含まれています(デフォルトは昨日)。しかし、代わりにdefault_argを参照して、気流が日付を処理するようにするとよいでしょう。

簡単にするために、私はdefault_arg start_dateとスケジュール(毎日実行)を使用して、BCPコマンドで変数を入力したいと考えています。これは適切なアプローチですか、私はPythonスクリプトで日付のロジックを維持する必要がありますか?

答えて

3

これは正しいアプローチですが、実際に必要なのはexecution_dateで、start_dateではありません。 provide_context=Trueパラメータを使用して、execution_dateをデフォルトの変数'ds'としてPythonOperatorのコンテキストで取得できます。 provide_context=Trueパラメータは、Jinjaテンプレートで使用されるデフォルト変数のセットをkwargs引数として渡します。デフォルト変数とJinjaテンプレートの詳細については、ドキュメントの該当するセクションを参照してください。 https://airflow.incubator.apache.org/code.html#default-variables https://airflow.incubator.apache.org/concepts.html#jinja-templating

あなたのコードは、次のようになります。

def query_db(**kwargs): 
    #get execution date in format YYYY-MM-DD 
    query_date = kwargs.get('ds') 

    #rest of your logic 

t_query_db = PythonOperator( task_id='query_db', python_callable=query_db, provide_context=True, dag=dag)

+0

クールに、私はこの実際に素早くで遊んでみましょう。最初は混乱していましたが、その後、「ds」と「yesterday_ds」などを示すAPIドキュメントのMacrosセクションが見つかりました。 – trench

+0

はい、本当に混乱しやすいテーマであるため、もっと明確にしておくべきです。私は答えを更新したので、うまくいけばそれは少し良く説明されます。 –

関連する問題