2017-05-18 19 views
1

変数{{ds}}にある実行日を渡します。しかし、私はそれを実行日付を取得しない関数を渡しました。ダグ内の関数にds変数を渡すにはどうすればよいですか?

def get_spark_step_2(date): 
     #logic in here 
     return step 

exec_date = '{{ ds }}' 

step_adder2 = EmrAddStepsOperator(
    task_id='create_parquets', 
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
    aws_conn_id='aws_default', 
    steps=get_spark_step_2(exec_date), 
    dag=dag 
) 

上記のコンテキストで変数を使用する方法をご存知ですか?

答えて

2

EmrAddStepsOperatorを拡張するクラスを作成し、stepsをテンプレート化フィールドにします。このような

何か:自身が唯一のテンプレートフィールドとしてjob_flow_idあり

class MyEmrAddStepsOperator(EmrAddStepsOperator): 

    template_fields = ['job_flow_id','steps'] 

EmrAddStepsOperator

class EmrAddStepsOperator(BaseOperator): 
    """ 
    An operator that adds steps to an existing EMR job_flow. 
    :param job_flow_id: id of the JobFlow to add steps to 
    :type job_flow_name: str 
    :param aws_conn_id: aws connection to uses 
    :type aws_conn_id: str 
    :param steps: boto3 style steps to be added to the jobflow 
    :type steps: list 
    """ 
    template_fields = ['job_flow_id'] 

あなたが唯一のマクロを使用することができます(dsなど)の分野でテンプレート化されています。

+1

ありがとうございます!このばかげた質問を申し訳ありませんが、その小さなバリエーションだけで、_init_を定義してその拡張クラスで実行する必要がありますか? – ebertbm

+0

何も追加する必要はありません – jhnclvr

関連する問題