2017-04-05 13 views
0

私は動的にタスクを追加した以下のワークフローを作成しました。しかし、AirflowはTask,JoinおよびQTノードをDagに追加することはできません。グラフ表示では、STARTENDノードしか表示されません。私がここで間違っていることは何ですか?気流動的タスクの作成に関する問題

ありがとうございました。

dag = DAG(
    'ddl_ver1', 
    default_args=default_args, 
    schedule_interval='*/5 * * * *' 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    qts = [] 
    for i in range(7): 
     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 
     qts.append(query) 

    return qts 



def create_data_discovery_tasks(accounts): 
    for account_id in accounts: 
     task = DummyOperator(
      task_id='ddl_task_' + str(account_id), 
      dag=dag) 

     join = DummyOperator(
      task_id='ddl_join_' + str(account_id), 
      dag=dag) 

     qts = create_qts(account_id) 

     for qt in qts: 
      qt.set_upstream(task) 
      qt.set_downstream(join) 

     task.set_upstream(START) 

     join.set_downstream(END) 

答えて

0

create_qtsとは決して実際には呼ばれません。

開始時に開始し、生成されたタスクを中間に置き、終了時に終了する場合は、set_downstreamまたはset_upstreamのいずれかを呼び出す必要があります。

これは正常に動作します(非常に最後の行に注意してください):

from airflow.models import DAG 
from airflow.operators.dummy_operator import DummyOperator 
from datetime import datetime 

dag = DAG(
    'ddl_ver1', 
    schedule_interval='*/5 * * * *', 
    start_date=datetime(2017,4,30) 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    previous_qt = None 
    for i in range(7): 

     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 

     if previous_qt: 
      previous_qt.set_downstream(qt) 
     else: 
      start_node.set_downstream(qt) 

     previous_qt = qt 

    qt.set_downstream(end_node) 

create_qts(123) 
関連する問題