2017-06-19 21 views
2

私は3つの演算子の単純なDAGを持っています。最初のものはPythonOperatorであり、他の2つは標準オペレータairflow.contrib(正確にはFileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator)です。彼らは順番に動作します。私たちのカスタムタスクは、パラメータに応じて、通常2〜5のファイルを生成します。これらのファイルはすべて後続のタスクで個別に処理する必要があります。つまり、私はいくつかの下流のブランチが必要ですが、DAGが実行される前にいくつのブランチがあるかは分かりません。エアフローDAGを動的にネストする方法は?

どのようにこの問題にアプローチしますか?

UPDATE:

BranchPythonOperatorを使用する出発点として、彼のanother replyで述べたjhnclvr、私はスキップや状況に応じて、ブランチを実行し続けることになるオペレータを作成しました。このアプローチは可能な限り多くのブランチが知られており、十分に小さいために実行可能です。

オペレータ:

class SkipOperator(PythonOperator): 
    def execute(self, context): 
     boolean = super(SkipOperator, self).execute(context) 
     session = settings.Session() 
     for task in context['task'].downstream_list: 
      if boolean is False: 
       ti = TaskInstance(
        task, execution_date=context['ti'].execution_date) 
       ti.state = State.SKIPPED 
       ti.start_date = datetime.now() 
       ti.end_date = datetime.now() 
       session.merge(ti) 
     session.commit() 
     session.close() 

使用法:それは実行しているとき

def check(i, templates_dict=None, **kwargs): 
    return len(templates_dict["data_list"].split(",")) > i 

dag = DAG(
    dag_name, 
    default_args=default_args, 
    schedule_interval=None 
) 

load = CustomOperator(
    task_id="load_op", 
    bash_command=' '.join([ 
     './command.sh' 
     '--data-list {{ dag_run.conf["data_list"]|join(",") }}' 
    ]), 
    dag=dag 
) 

for i in range(0, 5): 
    condition = SkipOperator(
     task_id=f"{dag_name}_condition_{i}", 
     python_callable=partial(check, i), 
     provide_context=True, 
     templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'}, 
     dag=dag 
    ) 
    gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i 

    load_to_gcs = CustomFileToGoogleCloudStorageOperator(
     task_id=f"{dag_name}_to_gs_{i}", 
     src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i, 
     bucket=gs_bucket, 
     dst=gs_filename, 
     mime_type='application/json', 
     google_cloud_storage_conn_id=connection_id, 
     dag=dag 
    ) 
    load_to_bq = GoogleCloudStorageToBigQueryOperator(
     task_id=f"{dag_name}_to_bq_{i}", 
     bucket=gs_bucket, 
     source_objects=[gs_filename, ], 
     source_format='NEWLINE_DELIMITED_JSON', 
     destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i, 
     bigquery_conn_id=connection_id, 
     schema_fields={}, 
     google_cloud_storage_conn_id=connection_id, 
     write_disposition='WRITE_TRUNCATE', 
     dag=dag 
    ) 

    condition.set_upstream(load) 
    load_to_gcs.set_upstream(condition) 
    load_to_bq.set_upstream(load_to_gcs) 

答えて

2

See a similar (but different) question here

は基本的に、あなたがDAGにタスクを追加することはできません。追加するタスクの数を事前に知る必要があります。

1人のオペレータでN個のファイルを処理できます。

ファイルを処理する別の独立したダグがある場合は、DAGをN回トリガーして、conf内のファイル名を渡すことができます。

See here for an example of the TriggerDagRunOperator.

See here for the DAG that would be triggered.

And lastly see this post from which the above examples are from.

関連する問題