2016-12-21 18 views
2

実行中の最初のタスクからデータを取得するためにXCOMを使用する動的シーケンスetlジョブをセットアップしようとしています。ここでは、現在のコードは次のようになります。それは私が行方不明です小さな何か、または私はすべてを持っている場合場合気流XCOM KeyError: 'task_instance'

Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file 
    m = imp.load_source(mod_name, filepath) 
    File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module> 
    pycall >> group(1) >> complete 
    File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group 
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor') 
KeyError: 'task_instance' 

わからない:

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime as dt, timedelta as td, date 
from airflow.models import BaseOperator 
from airflow.operators.sensors import ExternalTaskSensor 
from airflow.operators.dummy_operator import DummyOperator 
from airflow.operators.python_operator import PythonOperator 
from airflow.models import Variable 

START_DT = dt.combine(dt.today(), dt.min.time()) 
END_DT = dt.combine(dt.today(), dt.max.time()) 
NOW = dt.now() 
CURRENT_EXEC = '{{ execution_date }}' 
TODAY_MD = dt.today().strftime("%m%d") 

def datetime_range(start, end, delta): 
    """Generates the date range with time separation""" 
    current = start 
    if not isinstance(delta, td): 
      delta = td(**delta) 
    while current < end: 
     yield current 
     current += delta 

default_args = { 
     'owner': 'test', 
     'depends_on_past': False, 
     'start_date': START_DT, 
     'email': ['[email protected]'], 
     'email_on_failure': False, 
     'email_on_retry': False, 
     'queue': 'etl', 
     'retries': 1, 
     'retry_delay': td(minutes=1), 
} 

dag_name = 'SEQ_TEST_01' 

dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30)) 

def seq_job(sq_dt, **kwargs): 
    for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1): 
     if sq_dt < str(dt_in): 
      curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59) 
      sequence = int(curr_seq[0]) 
      return sequence 

pycall = PythonOperator(
    task_id='seq_sensor', 
    provide_context=True, 
    python_callable=seq_job, 
    op_kwargs={'sq_dt': CURRENT_EXEC}, 
    dag=dag) 

def group(grp, **context): 
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor') 
    grp = '%0.2d' % grp 
    database = 'TEST' 
    today_date = '{{ ds_nodash }}' 
    return BashOperator(
      task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap), 
      bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp), 
      dag=dag) 

complete = DummyOperator(
     task_id='All_Sequences_complete', 
     dag=dag) 

pycall >> group(1) >> complete 
pycall >> group(2) >> complete 
pycall >> group(3) >> complete 

問題は、私がしようとするものに関係なく、私はこのエラーを取得しておくことです違う。空気流にまだ慣れず、datetime_rangeによって生成されたunique_sequence numberで30分ごとに実行するETLテストenvをセットアップしようとしており、execute_date変数に基づいています。

答えて

2

私は別の関数に、bashのオペレータを移動することによってそれを解決し、経由でPythonのオペレータからデータを引き出し:

def bash_out(group, **kwargs): 
     sequence = "{{ task_instance.xcom_pull(task_ids='seq_sensor') }}" 
     return BashOperator(task_id='ETL_{}_GRP{}'.format(database, group), bash_command='script.sh {} {} {} {}'.format(today_date, database, sequence, group), dag=dag) 

と依存関係を設定する:

pycall >> [bash_out('01'), bash_out('02'), bash_out('03')] >> complete 
1

context['ti']を代わりに使用してください。

+0

はまだ行くことを試したと:( –

関連する問題