0
私は、2つのタスクで構成された簡単なエアフローワークフローを持っています。ストックデータを含むcsvファイルをダウンロードします。他方は最大株価を抽出し、そのデータを別のファイルに書き込む。エアフローがタスクの依存性を満たしていない
最初のタスクを実行し、次に2番目のものがうまくいけば、代わりに実行:airflow runs_d get_max_shareを実行すると依存関係を満たしていません。
import csv
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
def get_stock_data():
url = "https://app.quotemedia.com/quotetools/getHistoryDownload.csv?&webmasterId=501&startDay=02&startMonth=02&startYear=2002&endDay=02&endMonth=07&endYear=2009&isRanged=false&symbol=APL"
try:
r = requests.get(url)
except requests.RequestException as re:
raise
else:
with open('/tmp/stocks/airflow_stock_data.txt', 'w') as f:
f.write(r.text)
def get_max_share():
stock_data = []
stock_max = {}
with open('/tmp/stocks/airflow_stock_data.txt', 'r') as f:
stock_reader = csv.reader(f)
next(stock_reader, None)
for row in stock_reader:
stock_data.append(row)
for stock in stock_data:
stock_max[stock[2]] = stock[0]
with open('/tmp/stocks/max_stock', 'w') as f:
stock_price = max(stock_max.keys())
stock_max_price_date = stock_max[stock_price]
stock_entry = stock_max_price_date + ' -> ' + stock_price
f.write(stock_entry)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 5, 30),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False,
}
dag = DAG('stocks_d', default_args=default_args, schedule_interval=timedelta(minutes=5))
task_get_stocks = PythonOperator(task_id='get_stocks', python_callable=get_stock_data, dag=dag)
task_get_max_share = PythonOperator(task_id='get_max_share', python_callable=get_max_share, dag=dag)
task_get_max_share.set_upstream(task_get_stocks)
なぜ起こりますか?
何も起こりません。ファイルは作成されません。 – bsd