私はいくつかのサブダグを含むメインダグ(main_dag)を使用しており、それらのサブダグのそれぞれには多くのタスクがあります。サブdagA taskAからxcomをプッシュしましたが、サブdagB taskB内でそのxcomをプルしています。 xcom_pull()のdag_id引数がデフォルトでself.dag_idになっているので、必要なxcomを取得できませんでした。私はこれをどうやってやろうと思っていましたか、このシナリオを設定するより良い方法があれば、これに対処する必要はありません。私は現在subdagBでやっているもののサブダグからxcomを引き出す
例:
def subdagB(parent_dag, child_dag, start_date, schedule_interval):
subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
start = DummyOperator(
task_id='taskA',
dag=subdagB)
tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
t1 = BashOperator(
task_id='taskB',
bash_command=tag_db_template,
xcom_push=True,
dag=subdagB)
end = DummyOperator(
task_id='taskC',
dag=subdagB)
t0.set_upstream(start)
t1.set_upstream(t0)
end.set_upstream(t1)
return subdagB
は、任意の助けを事前にありがとうございます!
TriggerDagRunOperatorを使用すると、デフォルトでDagRunの実行日が異なるため、xcoms *を* DAGに渡すことができなくなります。 dagrun_operator.pyをカスタマイズして、呼び出し元のDAGのexecution_dateをそのまま使用して回避することができます。 – jastang