2017-06-19 26 views
2

私の考えでは、入力リスト(ユーザー、レポート、ログファイルなど)を生成するタスクfooを作成し、入力リストのすべての要素に対してタスクを起動します。目標は、Airflowの再試行ではなく、再試行と他のロジックを使用することです。 実行時にDAGに追加されたタスクがスケジュールされない

だから、理想的には、私のDAGは、次のようになります。 enter image description here

ここでの唯一の変数は、生成されたタスクの数です。これらの作業がすべて完了したら、さらにいくつかのタスクを実行したいので、タスクごとに新しいDAGを作成することは適切ではないようです。

これは私のコードです:ログで

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2015, 6, 1) 
} 

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args) 

foo_operator = BashOperator(
    task_id='foo', 
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))), 
    xcom_push=True, 
    dag=dag) 

def gen_nodes(**kwargs): 
    ti = kwargs['ti'] 
    workers = json.loads(ti.xcom_pull(task_ids='foo')) 

    for wid in workers: 
     print("Iterating worker %s" % wid) 
     op = PythonOperator(
      task_id='test_op_%s' % wid, 
      python_callable=lambda: print("Dynamic task!"), 
      dag=dag 
     ) 

     op.set_downstream(bar_operator) 
     op.set_upstream(dummy_op) 

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes', 
    python_callable=gen_nodes, 
    provide_context=True, 
    dag=dag 
) 

gen_subdag_node_op.set_upstream(foo_operator) 

dummy_op = DummyOperator(
    task_id='dummy', 
    dag=dag 
) 

dummy_op.set_upstream(gen_subdag_node_op) 

bar_operator = DummyOperator(
    task_id='bar', 
    dag=dag) 

bar_operator.set_upstream(dummy_op) 

、私は(すなわちIterating worker 5、など)gen_nodesが正しく実行されていることがわかります。ただし、新しいタスクはスケジュールされておらず、実行されたという証拠はありません。

オンラインで関連コードサンプルが見つかりました。such as thisが動作しませんでした。何か不足していますか?

また、この問題(作業単位の分離)には、より適切なアプローチがありますか?

答えて

2

現在のところ、エアフローは、ダグ実行中にタスクの追加/削除をサポートしていません。

ワークフローの順序は、ダグ実行の開始時に評価されるものになります。

See the second paragraph here.

これは、実行中に起こる何かに基づいてタスクを追加/削除することができないことを意味します。実行に関連しないものに基づいてforループにXタスクを追加することはできますが、実行が開始された後はワークフローの形状/順序を変更することはありません。

多くの時間があなたの代わりに、DAGの実行中に決定を下すためにBranchPythonOperatorを使用することができます(これらの決定は、あなたのxcom値に基づいて行うことができる)が、彼らはすでにワークフローに存在するブランチを下るという決定である必要があります。

ダグが実行され、ダグの定義は完全に直感的ではない方法で、空気の流れに分離されているが、より多くまたはダグラン(xcomdag_run.confなど)の内部で生成された作成された以下のものは/定義には使用できませんダグそのもの。

+0

回答ありがとうございます。@ jhnclvrですから、本質的に、N個のアイテムを繰り返したい場合、 'xcom'からリストを引っ張って一つのタスクで反復することがこれを行う唯一の方法ですか? – Gediminas

+0

これは確かにうまくいく方法です。あなたのシナリオに合っていれば 'xcom'値に基づいてダウンできるX個のブランチを持つこともできます。あるいは、同様に 'xcom'に基づいて' TriggerDagRunOperator'を使って他のダグをトリガすることができます。 – jhnclvr

+0

私は1:に数分かかり、2:は任意の回数実行する必要があるというタスク(ファイルの解凍)があります。ですから、xcomの展開するファイルのリストを繰り返し処理するタスクを作成しようとすると、タイムアウトはできません。 –

関連する問題