2017-06-13 9 views
1

PythonOperatorでマクロを使用できますか?私は次のことを試みましたが、レンダリングされたマクロを取得できませんでした!気流Python演算子のマクロ

dag = DAG(
    'temp', 
    default_args=default_args, 
    description='temp dag', 
    schedule_interval=timedelta(days=1)) 

def temp_def(a, b, **kwargs): 
    print '{{ds}}' 
    print '{{execution_date}}' 
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) 

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = PythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 

答えて

7

マクロは、テンプレートフィールドに対してのみ処理されます。 Jinjaにこのフィールドを処理させるには、PythonOperatorをあなた自身で拡張してください。

class MyPythonOperator(PythonOperator): 
    template_fields = ('templates_dict','op_args') 

PythonOperator自体がテンプレートこのフィールドを持っているので、私はtemplate_fields'templates_dict'を追加しました: PythonOperator

今、あなたはそのフィールド内のマクロを使用することができるはずです。私には

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 
+1

これを正しい回答としてマークできますか?正しい答えですので –

+1

下位互換性のために、 'template_fields = PythonOperator.template_fields +( 'op_args'、)'のように 'template_fields'を無効にすることができます。ところで、私は[PythonOperator'テンプレートフィールドに 'op_args'と' op_kwargs'を追加するために[JIRA]を開きました(https://issues.apache.org/jira/browse/AIRFLOW-1814) –

1

をこれ以上のネイティブのAirflowの方法は、付属のPythonOperatorを使用し、provide_context=Trueパラメータをそのまま使用することです。

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    provide_context=True, 
    dag=dag) 

今、あなたはあなたがタスクに関連付けられているいくつかのカスタム定義されたparamsを持っていた場合、あなたがそれらにアクセスすることができ、あなたの呼び出し可能

def temp_def(**kwargs): 
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date'])) 

kwargsでマクロのすべてにアクセスでき、気流のメタデータとタスクのパラメータを持っています同様にkwargs['params']

+0

これはおそらく、している。私の答えは、マクロがなぜ処理されていないのかという特定の問題を主な対象としていました。 – jhnclvr