私の考えでは、入力リスト(ユーザー、レポート、ログファイルなど)を生成するタスクfoo
を作成し、入力リストのすべての要素に対してタスクを起動します。目標は、Airflowの再試行ではなく、再試行と他のロジックを使用することです。 実行時にDAGに追加されたタスクがスケジュールされない
ここでの唯一の変数は、生成されたタスクの数です。これらの作業がすべて完了したら、さらにいくつかのタスクを実行したいので、タスクごとに新しいDAGを作成することは適切ではないようです。
これは私のコードです:ログで
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op = DummyOperator(
task_id='dummy',
dag=dag
)
dummy_op.set_upstream(gen_subdag_node_op)
bar_operator = DummyOperator(
task_id='bar',
dag=dag)
bar_operator.set_upstream(dummy_op)
、私は(すなわちIterating worker 5
、など)gen_nodes
が正しく実行されていることがわかります。ただし、新しいタスクはスケジュールされておらず、実行されたという証拠はありません。
オンラインで関連コードサンプルが見つかりました。such as thisが動作しませんでした。何か不足していますか?
また、この問題(作業単位の分離)には、より適切なアプローチがありますか?
回答ありがとうございます。@ jhnclvrですから、本質的に、N個のアイテムを繰り返したい場合、 'xcom'からリストを引っ張って一つのタスクで反復することがこれを行う唯一の方法ですか? – Gediminas
これは確かにうまくいく方法です。あなたのシナリオに合っていれば 'xcom'値に基づいてダウンできるX個のブランチを持つこともできます。あるいは、同様に 'xcom'に基づいて' TriggerDagRunOperator'を使って他のダグをトリガすることができます。 – jhnclvr
私は1:に数分かかり、2:は任意の回数実行する必要があるというタスク(ファイルの解凍)があります。ですから、xcomの展開するファイルのリストを繰り返し処理するタスクを作成しようとすると、タイムアウトはできません。 –