2017-07-28 36 views
0

この質問に関連する元のコードはhereです。気流で複数のタスクを生成するときの上流/下流関係を逆にした

私はDAGで定義したタスクループの中で、ビットシフト演算子とset_upstream/set_downstreamのメソッドがうまく動作しています。次のようにDAGのメイン実行ループが設定されている場合:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_downstream(id_worker(uid)) 

又は

for uid in dash_workers.get_id_creds(): 
    clear_tables >> id_worker(uid) 

グラフは、このようになります(英数字シーケンスも、タスクIDを定義するユーザIDであります):

enter image description here

私はこのようなDAGのメイン実行ループを設定するとき:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_upstream(id_worker(uid)) 

または

for uid in dash_workers.get_id_creds(): 
    id_worker(uid) >> clear_tables 

グラフは次のようになります。

enter image description here

2番目のグラフは、私が持っているコードの最初の2つのスニペットを期待しているだろうか/私が欲しいものです私の読書に基づいて作成されました。ここで私は、最後のいくつかの質問を掲示以来更新されている完全なコードは、だ - 私はclear_tablesは、異なるユーザーIDのタスクを解析するデータの私のバッチをトリガする前に、最初に実行したい場合は、私はclear_tables >> id_worker(uid)

EDITとしてこれを示すべきです、参考のために:

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

ENV = os.environ 

default_args = { 
    'start_date': datetime.now(), 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

clear_tables = PythonOperator(
    task_id='clear_tables', 
    python_callable=dash_workers.clear_db, 
    dag=DAG) 

def id_worker(uid): 
    return PythonOperator(
     task_id=id, 
     python_callable=dash_workers.main_preprocess, 
     op_args=[uid], 
     dag=DAG) 

for uid in dash_workers.get_id_creds(): 
    preproc_task = id_worker(uid) 
    clear_tables << preproc_task 

LadislavIndraの提案@実装した後、私は正しい依存関係グラフを得るためにビットシフト演算子の同じ逆の実装を持ち続けます。

UPDATE @ AshBerlin-Taylorの回答はここでは何が起こっているのですか?グラフビューとツリービューは同じことをしていると仮定しましたが、そうではありません。ここでid_worker(uid) >> clear_tablesがグラフビューで次のようになります。

enter image description here

私は確かに、すべてのデータテーブルを削除するように私のデータの最終段階事前準備ルーチンをしたくありません!

答えて

3

ツリービューはそれについてどのように(!やI)最初に考えに「逆方向」です。最初のスクリーンショットでは、 "clear_tables"を "AAAG5608078M2"実行タスクの前に実行する必要があることを示しています。また、DAGステータスは、各id作業者のタスクによって異なります。したがって、タスクオーダーの代わりに、ステータスチェーンのツリーです。それが意味をなさないならば。

(これは、最初は奇妙に思えるかもしれないが、DAGは後ろに出て分岐し、分岐できるためです。)

あなたはより良い運があなたのDAGのグラフビューを見ていたかもしれません。これには矢印があり、より直感的な方法で実行順序を示します。 (私は今、ツリービューが有用であることを知っていますが、それはあまり明確ではありません)

+0

@ AshBerli-Taylor - これに釘付け!グラフビューからのスクリーンショットによる更新の投稿。 – Aaron

1

あなたの他のコードを見てみると、get_id_credsがあなたの仕事であり、それをループしていると思われます。これはいくつかの奇妙な相互作用を作り出しています。動作します

パターンがある:エアフローで

clear_tables = MyOperator() 

for uid in uid_list: 
    my_task = MyOperator(task_id=uid) 
    clear_tables >> my_task 
+0

ありがとう@LadislavIndra、しかしそれでも動作しません。私は完全なコードで質問を更新するつもりです。 – Aaron

関連する問題