2016-07-11 10 views
12

私はAirflow subDAGセクションを見て、役立つと思われるものをオンラインで見つけようとしましたが、subDAGを作成する方法を詳しく説明したものは見つかりませんでした。サブDAGを実行するための要件の1つは、有効にする必要があることです。サブダッグをどのように有効/無効にしますか?AirflowでsubDAGはどのくらい正確に機能しますか?サブDAGを有効にするとはどういう意味ですか?

気流のエラーを表示しないサンプルコードを書きましたが、実行しようとすると、subDAGの演算子は実行されません。

これは私のメインのDAGコードです:このコードで

import os 
from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 
from airflow.operators.subdag_operator import SubDagOperator 
from linecount_subdag import sub_dag 

parent_dag_name = 'example_linecount_dag' 
child_dag_name = 'example_linecount_subdag' 

args = { 
    'owner': 'airflow', 
    'start_date': datetime(2016, 04, 20), 
    'retries': 0, 
} 
main_dag = DAG(
    dag_id=parent_dag_name, 
    default_args=args, 
    schedule_interval=timedelta(minutes=5), 
    start_date=datetime(2016, 04, 20), 
    max_active_runs=1 
) 

subdag = SubDagOperator(
    subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval), 
    task_id=child_dag_name, 
    default_args=args, 
    dag=main_dag) 
t = BashOperator(
    task_id='start', 
    bash_command='echo "waiting for subdag..."', 
    default_args=args, 
    dag=main_dag) 
t.set_downstream(subdag) 

、タスク「スタート」は成功、しかしsubdagタスクは何もしませんし、どちらも失敗もなく成功します。ここで

は私subDAGコードです:

from airflow.models import DAG 
from airflow.operators import BashOperator 

# Dag is returned by a factory method 
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): 
    dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name), 
    default_args=args, 
    start_date=args['start_date'], 
    max_active_runs=1, 
) 
    t1 = BashOperator(
    task_id='count_lines', 
    bash_command='cat /root/airflow/airflow.cfg | wc -l', 
    default_args=args, 
    xcom_push=True, 
    dag=dag) 
    t2 = BashOperator(
    task_id='retrieve_val', 
    bash_command='grep "airflow_home" /root/airflow/airflow.cfg', 
    default_args=args, 
    xcom_push=True, 
    dag=dag) 
    templated_command = """ 
    { 
     echo "{{ ti.xcom_pull(task_ids='count_lines') }}" 
     echo "{{ ti.xcom_pull(task_ids='retrieve_val') }}" 
    }""" 
    t3 = BashOperator(
    task_id='print_values', 
    bash_command=templated_command, 
    default_args=args, 
    dag=dag) 
    t3.set_upstream(t1) 
    t3.set_upstream(t2) 
    return dag 

このコードでは3事業者は、そのファイルに「airflow_home」の値を見つけ、ファイル「airflow.cfg」の行数を取得し、リターン両方の値が印刷されます。このコードは単独で動作するので、問題はないと思います。

subDAGの演算子を実行するにはどうすればよいですか?

+0

あなたがこれを理解しました:ここ

はsubdagの最後のステップからの出力はありますか? – trench

答えて

2

私はあなたのコードをローカルで使用しており、うまくいきます。

私が変更したのは、外側のダグとサブダッグの両方をschedule_interval = Noneに設定し、手動でトリガーしたことだけでした。

日時(2016、04、20)5分ます洪水多く埋め戻し要求に気流スケジューラのschedule_intervalの開始日を有します。

LocalExecutorからCeleryExecutorへの切り替えが必要な場合があります。 LocalExecutorはかなり制限されています。

[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:  { 
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:   echo "226" 
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:   echo "airflow_home = /root/airflow/" 
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:  } 
関連する問題