0
私は動的にタスクを追加した以下のワークフローを作成しました。しかし、AirflowはTask
,Join
およびQT
ノードをDagに追加することはできません。グラフ表示では、START
とEND
ノードしか表示されません。私がここで間違っていることは何ですか?気流動的タスクの作成に関する問題
ありがとうございました。
dag = DAG(
'ddl_ver1',
default_args=default_args,
schedule_interval='*/5 * * * *'
)
start_node = DummyOperator(task_id='ddl_start',
dag=dag)
end_node = DummyOperator(task_id='ddl_finish',
dag=dag)
def create_qts(account_id):
qts = []
for i in range(7):
qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i),
dag=dag)
qts.append(query)
return qts
def create_data_discovery_tasks(accounts):
for account_id in accounts:
task = DummyOperator(
task_id='ddl_task_' + str(account_id),
dag=dag)
join = DummyOperator(
task_id='ddl_join_' + str(account_id),
dag=dag)
qts = create_qts(account_id)
for qt in qts:
qt.set_upstream(task)
qt.set_downstream(join)
task.set_upstream(START)
join.set_downstream(END)