2017-04-19 9 views
0

私はいくつかのサブダグを含むメインダグ(main_dag)を使用しており、それらのサブダグのそれぞれには多くのタスクがあります。サブdagA taskAからxcomをプッシュしましたが、サブdagB taskB内でそのxcomをプルしています。 xcom_pull()のdag_id引数がデフォルトでself.dag_idになっているので、必要なxcomを取得できませんでした。私はこれをどうやってやろうと思っていましたか、このシナリオを設定するより良い方法があれば、これに対処する必要はありません。私は現在subdagBでやっているもののサブダグからxcomを引き出す

例:

def subdagB(parent_dag, child_dag, start_date, schedule_interval): 
    subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval) 
    start = DummyOperator(
     task_id='taskA', 
     dag=subdagB) 
    tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};''' 
    t1 = BashOperator(
     task_id='taskB', 
     bash_command=tag_db_template, 
     xcom_push=True, 
     dag=subdagB) 
    end = DummyOperator(
     task_id='taskC', 
     dag=subdagB) 
    t0.set_upstream(start) 
    t1.set_upstream(t0) 
    end.set_upstream(t1) 
    return subdagB 

は、任意の助けを事前にありがとうございます!

答えて

0

[Operator] .xcom_pull(dag_id = dag_id、...)または[TaskInstance] .xcom_pull(dag_id = dag_id、...)でdag_idを上書きする限り、問題はないはずです。

where dag_id = "{parent_dag_id}.{child_dag_id}"

、あなたの例では、より完全にすることができれば、私はそれをローカルに実行してみることができますが、私は(同様の)例をテストし、期待通りのクロスsubdag-xcomsが働きます。

+0

TriggerDagRunOperatorを使用すると、デフォルトでDagRunの実行日が異なるため、xcoms *を* DAGに渡すことができなくなります。 dagrun_operator.pyをカスタマイズして、呼び出し元のDAGのexecution_dateをそのまま使用して回避することができます。 – jastang

関連する問題