2017-08-17 12 views
2

私はクライアントのリストを持っているユースケースを持っています。クライアントはリストに追加または削除することができ、開始日や開始パラメータが異なることがあります。エアフロー:動的サブダッグの作成

エアフローを使用して、最初の開始日と何かが失敗した場合の再実行に基づいて、各クライアントのすべてのデータをバックフィルします。私は各クライアントのSubDagを作成することを考えています。これは私の問題に対処しますか?

どのように私は動的にサブIDをclient_idに基づいて作成できますか?

答えて

0

あなたは間違いなく、DAGが動的にオブジェクトを作成することができます。

def make_client_dag(parent_dag, client): 
    return DAG(
    '%s.client_%s' % (parent_dag.dag_id, client.name), 
    start_date = client.start_date 
) 

あなたは、あなたのメインのDAGからSubDagOperatorにそのメソッドを使用することができます。

for client in clients: 
    SubDagOperator(
    task_id='client_%s' % client.name, 
    dag=main_dag, 
    subdag = make_client_dag(main_dag, client) 
) 

これは、各メンバーに固有のsubdagを作成します。コレクションclients、およびそれぞれは、メインダグの次の呼び出しのために実行されます。私はあなたが望むバックフィルの行動を取るかどうか分からない。