2017-05-30 30 views
0

私は、2つのタスクで構成された簡単なエアフローワークフローを持っています。ストックデータを含むcsvファイルをダウンロードします。他方は最大株価を抽出し、そのデータを別のファイルに書き込む。エアフローがタスクの依存性を満たしていない

最初のタスクを実行し、次に2番目のものがうまくいけば、代わりに実行:airflow runs_d get_max_shareを実行すると依存関係を満たしていません。

import csv 
from datetime import datetime 
from datetime import timedelta 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
import requests 


def get_stock_data(): 
    url = "https://app.quotemedia.com/quotetools/getHistoryDownload.csv?&webmasterId=501&startDay=02&startMonth=02&startYear=2002&endDay=02&endMonth=07&endYear=2009&isRanged=false&symbol=APL" 
    try: 
     r = requests.get(url) 
    except requests.RequestException as re: 
     raise 
    else: 
     with open('/tmp/stocks/airflow_stock_data.txt', 'w') as f: 
      f.write(r.text) 

def get_max_share(): 
    stock_data = [] 
    stock_max = {} 
    with open('/tmp/stocks/airflow_stock_data.txt', 'r') as f: 
     stock_reader = csv.reader(f) 
     next(stock_reader, None) 
     for row in stock_reader: 
      stock_data.append(row) 

    for stock in stock_data: 
     stock_max[stock[2]] = stock[0] 

    with open('/tmp/stocks/max_stock', 'w') as f: 
     stock_price = max(stock_max.keys()) 
     stock_max_price_date = stock_max[stock_price] 
     stock_entry = stock_max_price_date + ' -> ' + stock_price 
     f.write(stock_entry) 


default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 5, 30), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    'catchup': False, 
    } 

dag = DAG('stocks_d', default_args=default_args, schedule_interval=timedelta(minutes=5)) 


task_get_stocks = PythonOperator(task_id='get_stocks', python_callable=get_stock_data, dag=dag) 
task_get_max_share = PythonOperator(task_id='get_max_share', python_callable=get_max_share, dag=dag) 

task_get_max_share.set_upstream(task_get_stocks) 

なぜ起こりますか?

答えて

0

$ airflow runs_d get_max_share 上記のコマンドは、get_max_shareを実行します。実行する前の前のタスクではありません。

あなたは全体のDAG実行を確認する必要がある場合は、コマンドの下にしようと $気流trigger_dag stocks_d

+0

何も起こりません。ファイルは作成されません。 – bsd

関連する問題