2017-03-21 16 views
2

私は気流に精通し、これまでこれを愛しています。エアフローのパラメータをループする最も良い方法は?

しかし、私は少しはっきりしていませんが、私は同じダッグを実行したいが、複数のビジネスライン(ロブ)を並行して実行したい私のダグを正しく統制する方法です。だから基本的には、それぞれのランで複数のロブのために以下のダグを走らせ、それぞれのロブをパラレルで走らせたい。

"lob1"、 "lob2"などのようなロブの配列である変数を定義することができます。私は 'lob1'と 'lob2'の下のbigquery SQL文で 'mylob'

私はおそらく私は変数からuiのループを格納することができますし、ダグでそれをループするが、私はそれが各タスクを待つので各ループの繰り返しで終了します。

私は、このパラメタイズされたダグを一種の大きなドライバーダグのサブダグとして使うことが考えられます。しかし、これがベストプラクティスのアプローチであるかどうかはもう一度確かめてください。

大変助かりました。私はここで何かを見逃しているように感じますが、どこかでこのような例を見つけるのはあまりありません。

""" 
### My first dag to play around with bigquery and gcp stuff. 
""" 

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 
from dateutil import tz 
from airflow.contrib.hooks.bigquery_hook import BigQueryHook 
from airflow.contrib.operators.bigquery_operator import BigQueryOperator 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 3, 10),  
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *', 
     default_args=default_args) as dag: 

    bq_msg_1 = BigQueryOperator(
     task_id='my_bq_task_1', 
     bql='select "mylob" as lob, "Hello World!" as msg', 
     destination_dataset_table='airflow.test1', 
     write_disposition='WRITE_TRUNCATE', 
     bigquery_conn_id='gcp_smoke' 
    ) 

    bq_msg_1.doc_md = """\ 
    #### Task Documentation 
    Append a "Hello World!" message string to the table [airflow.msg] 
    """ 

    bq_msg_2 = BigQueryOperator(
     task_id='my_bq_task_2', 
     bql='select "mylob" as lob, "Goodbye World!" as msg', 
     destination_dataset_table='airflow.test1', 
     write_disposition='WRITE_APPEND', 
     bigquery_conn_id='gcp_smoke' 
    ) 

    bq_msg_2.doc_md = """\ 
    #### Task Documentation 
    Append a "Goodbye World!" message string to the table [airflow.msg] 
    """ 

    # set dependencies 
    bq_msg_2.set_upstream(bq_msg_1) 

アップデート:この作業を取得しようとするが、それは

""" 
### My first dag to play around with bigquery and gcp stuff. 
""" 

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 
from airflow.contrib.operators.bigquery_operator import BigQueryOperator 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 3, 10),  
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args) 

lobs = ["lob1","lob2","lob3"] 

for lob in lobs: 

    templated_command = """ 
     select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg 
    """  

    bq_msg_1 = BigQueryOperator(
     dag = dag, 
     task_id='my_bq_task_1', 
     bql=templated_command, 
     params={'lob': lob}, 
     destination_dataset_table='airflow.test1', 
     write_disposition='WRITE_APPEND', 
     bigquery_conn_id='gcp_smoke' 
    ) 
+0

この例では、やや私はhttps://github.com/apache/incubator-airflow/blob/master/airflow/contrib/example_dags/example_twitter_dag.py分岐使用して必要なもののようであるかもしれないように思える – andrewm4894

+0

別ここでconfパラメータを使用するオプションhttps://github.com/andrewm4894/random/blob/master/my_bq_dag_2.py – andrewm4894

答えて

1

使用trigger_dagの概念をLOB2ために作るように見えることはありません。そのような使用の場合に意味されます。ここで、コントローラダグからサブダグにパラメータを渡します。エアフローインストールのexamplesフォルダーに例があります。

関連する問題