この質問に関連する元のコードはhereです。気流で複数のタスクを生成するときの上流/下流関係を逆にした
私はDAGで定義したタスクループの中で、ビットシフト演算子とset_upstream
/set_downstream
のメソッドがうまく動作しています。次のようにDAGのメイン実行ループが設定されている場合:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
又は
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
グラフは、このようになります(英数字シーケンスも、タスクIDを定義するユーザIDであります):
私はこのようなDAGのメイン実行ループを設定するとき:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
または
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
グラフは次のようになります。
2番目のグラフは、私が持っているコードの最初の2つのスニペットを期待しているだろうか/私が欲しいものです私の読書に基づいて作成されました。ここで私は、最後のいくつかの質問を掲示以来更新されている完全なコードは、だ - 私はclear_tables
は、異なるユーザーIDのタスクを解析するデータの私のバッチをトリガする前に、最初に実行したい場合は、私はclear_tables >> id_worker(uid)
EDITとしてこれを示すべきです、参考のために:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
LadislavIndraの提案@実装した後、私は正しい依存関係グラフを得るためにビットシフト演算子の同じ逆の実装を持ち続けます。
UPDATE @ AshBerlin-Taylorの回答はここでは何が起こっているのですか?グラフビューとツリービューは同じことをしていると仮定しましたが、そうではありません。ここでid_worker(uid) >> clear_tables
がグラフビューで次のようになります。
私は確かに、すべてのデータテーブルを削除するように私のデータの最終段階事前準備ルーチンをしたくありません!
@ AshBerli-Taylor - これに釘付け!グラフビューからのスクリーンショットによる更新の投稿。 – Aaron