2017-07-15 13 views
0

リストのためにDynamoDBにクエリを実行することによって作成されるDAGを持っています。リスト内の各アイテムについて、タスクはPythonOperatorを使用して作成され、DAGに追加されます。以下の例では表示されませんが、リストの項目のいくつかは他のタスクに依存するので、私はset_upstreamを使用して依存関係を強制しています。動的にタスクのリストを作成します

- airflow_home 
    \- dags 
    \- workflow.py 

workflow.py

def get_task_list(): 
    # ... query dynamodb ... 

def run_task(task): 
    # ... do stuff ... 

dag = DAG(dag_id='my_dag', ...) 
tasks = get_task_list() 
for task in tasks: 
    t = PythonOperator(
     task_id=task['id'], 
     provide_context=False, 
     dag=dag, 
     python_callable=run_task, 
     op_args=[task] 
    ) 

問題がworkflow.pyでは(?タスクが実行されるたびに)何度も実行して取得され、私のget_task_list()方法は、AWSと例外をスローで絞らなっています。

私はrun_task()が呼ばれた時はいつでも、それはので、私はこのような独立したモジュールにrun_task()を移動しようとしましたworkflow.py内のすべてのグローバルを実行していたので、それが思った:

- airflow_home 
    \- dags 
    \- workflow.py 
    \- mypackage 
     \- __init__ 
     \- task.py 

しかし、それは何も変更しませんでした。私はまだ同じ方法で動作するファクトリ関数でラップされたSubDagOperatorにget_task_list()を入れてみました。

私の問題はこれらの問題に関連していますか?

また、なぜworkflow.pyはそれほど頻繁に実行なっていると、なぜエラーがタスク方法がworkflow.pyを参照しないと失敗するために、個々のタスク原因get_task_list()によってスローされますと、それに依存しない?

最も重要なことは、リストを並行して処理し、リスト内の項目間の依存関係を強制する最も良い方法は何でしょうか?

答えて

1

参照した質問によれば、エアフローは、ダッグ実行中のタスク作成をサポートしていません。

したがって、実行が開始される前に、エアフローによって完全なDAG定義が定期的に生成されます。理想的には、そのような世代の期間は、そのDAGのスケジュール間隔と同じである必要があります。

BUT気流がダグの変更をチェックするたびに、完全なダグが生成され、リクエストが多すぎる可能性があります。この時間は、airflow.cfgのmin_file_process_intervalおよびdag_dir_list_intervalの設定を使用して制御されます。

タスクの失敗に関しては、ダグの作成自体に失敗し、エアフローでタスクを開始できなかったため、タスクが失敗します。

+0

'min_file_process_interval'を30に設定すると、' get_task_list() 'の呼び出しが30秒に遅れてしまい、スロットルを止めました。動的なタスクの作成については、私は[FAQ](http://airflow.readthedocs.io/)で述べたように、別のダグを構築し、それを 'globals()[dag_id]'に保存するダグを作成しようとしています。 en/latest/faq.html?highlight = dynamic#how-can-i-create-dags-dynamically) –

関連する問題