私は気流に精通し、これまでこれを愛しています。エアフローのパラメータをループする最も良い方法は?
しかし、私は少しはっきりしていませんが、私は同じダッグを実行したいが、複数のビジネスライン(ロブ)を並行して実行したい私のダグを正しく統制する方法です。だから基本的には、それぞれのランで複数のロブのために以下のダグを走らせ、それぞれのロブをパラレルで走らせたい。
"lob1"、 "lob2"などのようなロブの配列である変数を定義することができます。私は 'lob1'と 'lob2'の下のbigquery SQL文で 'mylob'
私はおそらく私は変数からuiのループを格納することができますし、ダグでそれをループするが、私はそれが各タスクを待つので各ループの繰り返しで終了します。
私は、このパラメタイズされたダグを一種の大きなドライバーダグのサブダグとして使うことが考えられます。しかし、これがベストプラクティスのアプローチであるかどうかはもう一度確かめてください。
大変助かりました。私はここで何かを見逃しているように感じますが、どこかでこのような例を見つけるのはあまりありません。
"""
### My first dag to play around with bigquery and gcp stuff.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from dateutil import tz
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 10),
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
with DAG('my_bq_dag_2', schedule_interval='30 */1 * * *',
default_args=default_args) as dag:
bq_msg_1 = BigQueryOperator(
task_id='my_bq_task_1',
bql='select "mylob" as lob, "Hello World!" as msg',
destination_dataset_table='airflow.test1',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='gcp_smoke'
)
bq_msg_1.doc_md = """\
#### Task Documentation
Append a "Hello World!" message string to the table [airflow.msg]
"""
bq_msg_2 = BigQueryOperator(
task_id='my_bq_task_2',
bql='select "mylob" as lob, "Goodbye World!" as msg',
destination_dataset_table='airflow.test1',
write_disposition='WRITE_APPEND',
bigquery_conn_id='gcp_smoke'
)
bq_msg_2.doc_md = """\
#### Task Documentation
Append a "Goodbye World!" message string to the table [airflow.msg]
"""
# set dependencies
bq_msg_2.set_upstream(bq_msg_1)
アップデート:この作業を取得しようとするが、それは
"""
### My first dag to play around with bigquery and gcp stuff.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 10),
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('my_bq_dag_2', schedule_interval='@once',default_args=default_args)
lobs = ["lob1","lob2","lob3"]
for lob in lobs:
templated_command = """
select '{{ params.lob }}' as lob, concat(string(current_timestamp()),' - Hello - {{ ds }}') as msg
"""
bq_msg_1 = BigQueryOperator(
dag = dag,
task_id='my_bq_task_1',
bql=templated_command,
params={'lob': lob},
destination_dataset_table='airflow.test1',
write_disposition='WRITE_APPEND',
bigquery_conn_id='gcp_smoke'
)
この例では、やや私はhttps://github.com/apache/incubator-airflow/blob/master/airflow/contrib/example_dags/example_twitter_dag.py分岐使用して必要なもののようであるかもしれないように思える – andrewm4894
別ここでconfパラメータを使用するオプションhttps://github.com/andrewm4894/random/blob/master/my_bq_dag_2.py – andrewm4894