我々は両方の組み合わせを使用しています
#create a sub dag containing DROP -> CREATE -> INSERT
sub_create_dag = DAG('%s.%s' % (parent_dag_name, child_dag_name), default_args=args)
pg_drop = DropPostgresOperator(task_id='drop_{}'.format(table), sql="", params={'schema': schema, 'table': table}, postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag)
pg_create = PostgresOperator(task_id='create_{}'.format(table), sql='{sql_path}/create_{name}.sql'.format(sql_path=sql_path, name=table), postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag)
pg_insert = PostgresOperator(task_id='insert_{}'.format(table), sql='{sql_path}/insert_{name}.sql'.format(sql_path=sql_path, name=table), postgres_conn_id=args['connection_id'], autocommit=True, dag=sub_create_dag)
pg_drop >> pg_create >> pg_insert
return dag
問題は、私のようなエラーを持っているつもりだということです。私たちはプラグインフォルダ内のカスタム演算子として独立したタスクを実装していますが、Pythonのような小さな関数はDAGフォルダ自体の中で機能します。 エラーについては、ジンジャがテンプレートフォルダを見つけられなかったためです。このエラーは、DAGまたはプラグインのいずれかのフォルダにヘルパー関数を実装した場合には表示されません。
出典
2017-06-26 08:58:44
Him