リストのためにDynamoDBにクエリを実行することによって作成されるDAGを持っています。リスト内の各アイテムについて、タスクはPythonOperatorを使用して作成され、DAGに追加されます。以下の例では表示されませんが、リストの項目のいくつかは他のタスクに依存するので、私はset_upstream
を使用して依存関係を強制しています。動的にタスクのリストを作成します
- airflow_home
\- dags
\- workflow.py
workflow.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
問題がworkflow.py
では(?タスクが実行されるたびに)何度も実行して取得され、私のget_task_list()
方法は、AWSと例外をスローで絞らなっています。
私はrun_task()
が呼ばれた時はいつでも、それはので、私はこのような独立したモジュールにrun_task()
を移動しようとしましたworkflow.py
内のすべてのグローバルを実行していたので、それが思った:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
しかし、それは何も変更しませんでした。私はまだ同じ方法で動作するファクトリ関数でラップされたSubDagOperatorにget_task_list()
を入れてみました。
私の問題はこれらの問題に関連していますか?
また、なぜworkflow.py
はそれほど頻繁に実行なっていると、なぜエラーがタスク方法がworkflow.py
を参照しないと失敗するために、個々のタスク原因get_task_list()
によってスローされますと、それに依存しない?
最も重要なことは、リストを並行して処理し、リスト内の項目間の依存関係を強制する最も良い方法は何でしょうか?
'min_file_process_interval'を30に設定すると、' get_task_list() 'の呼び出しが30秒に遅れてしまい、スロットルを止めました。動的なタスクの作成については、私は[FAQ](http://airflow.readthedocs.io/)で述べたように、別のダグを構築し、それを 'globals()[dag_id]'に保存するダグを作成しようとしています。 en/latest/faq.html?highlight = dynamic#how-can-i-create-dags-dynamically) –