私は3つの演算子の単純なDAGを持っています。最初のものはPythonOperator
であり、他の2つは標準オペレータairflow.contrib
(正確にはFileToGoogleCloudStorageOperator
とGoogleCloudStorageToBigQueryOperator
)です。彼らは順番に動作します。私たちのカスタムタスクは、パラメータに応じて、通常2〜5のファイルを生成します。これらのファイルはすべて後続のタスクで個別に処理する必要があります。つまり、私はいくつかの下流のブランチが必要ですが、DAGが実行される前にいくつのブランチがあるかは分かりません。エアフローDAGを動的にネストする方法は?
どのようにこの問題にアプローチしますか?
UPDATE:
BranchPythonOperator
を使用する出発点として、彼のanother replyで述べたjhnclvr、私はスキップや状況に応じて、ブランチを実行し続けることになるオペレータを作成しました。このアプローチは可能な限り多くのブランチが知られており、十分に小さいために実行可能です。
オペレータ:
class SkipOperator(PythonOperator):
def execute(self, context):
boolean = super(SkipOperator, self).execute(context)
session = settings.Session()
for task in context['task'].downstream_list:
if boolean is False:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
ti.state = State.SKIPPED
ti.start_date = datetime.now()
ti.end_date = datetime.now()
session.merge(ti)
session.commit()
session.close()
使用法:それは実行しているとき
def check(i, templates_dict=None, **kwargs):
return len(templates_dict["data_list"].split(",")) > i
dag = DAG(
dag_name,
default_args=default_args,
schedule_interval=None
)
load = CustomOperator(
task_id="load_op",
bash_command=' '.join([
'./command.sh'
'--data-list {{ dag_run.conf["data_list"]|join(",") }}'
]),
dag=dag
)
for i in range(0, 5):
condition = SkipOperator(
task_id=f"{dag_name}_condition_{i}",
python_callable=partial(check, i),
provide_context=True,
templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
dag=dag
)
gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i
load_to_gcs = CustomFileToGoogleCloudStorageOperator(
task_id=f"{dag_name}_to_gs_{i}",
src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
bucket=gs_bucket,
dst=gs_filename,
mime_type='application/json',
google_cloud_storage_conn_id=connection_id,
dag=dag
)
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=f"{dag_name}_to_bq_{i}",
bucket=gs_bucket,
source_objects=[gs_filename, ],
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
bigquery_conn_id=connection_id,
schema_fields={},
google_cloud_storage_conn_id=connection_id,
write_disposition='WRITE_TRUNCATE',
dag=dag
)
condition.set_upstream(load)
load_to_gcs.set_upstream(condition)
load_to_bq.set_upstream(load_to_gcs)