2017-03-23 10 views
2

次のDAGには2つのSSHExecuteOperatorタスクがあります。最初のタスクは、パラメータを返すストアドプロシージャを実行します。第2のタスクは、このパラメータを入力として必要とする。Airflowから値を取得する方法SSHExecuteOperatorでプッシュされたXCom

task2で使用するために、task1にプッシュされたXComから値を引き出す方法を教えてください。

from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.contrib.hooks.ssh_hook import SSHHook 
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator 
from airflow.models import Variable 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 0 
} 

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin 
DataQualitySSHHook = Variable.get('DataQualitySSHHook') 
print('Connecting to: ' + DataQualitySSHHook) 
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook) 
sshHookEtl.no_host_key_check = True 

#create dag 
dag = DAG(
    'ed_data_quality_test-v0.0.3', #update version whenever you change something 
    default_args=default_args, 
    schedule_interval="0 0 * * *", 
    dagrun_timeout=timedelta(hours=24), 
    max_active_runs=1) 

#create tasks 
task1 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_batch_register', 
    bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end 
    ssh_hook=sshHookEtl, 
    xcom_push=True, 
    retries=0, 
    dag=dag) 

task2 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_module_session_start', 
    bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}", 
    ssh_hook=sshHookEtl, 
    retries=0, 
    dag=dag) 

#create dependencies 
task1.set_downstream(task2) 
+0

あなたのDAGの定義は大丈夫そうです。 DAGを正常に実行できますか?エラーはありますか? –

答えて

1

だから私が見つけた解決策は、タスク1は、シェルスクリプトを実行したとき、あなたはXCOM変数によって捕獲したいパラメータは、(エコーを使用して)スクリプトによって印刷された最後のものであることを確認する必要があります。

は、その後、私は次のコードスニペットでXCOM変数の値を取得することができました:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}

関連する問題