私は気流を学んでおり、簡単な質問があります。以下は私のDAGは、空気の流れをテストする手段としてdog_retriever
Airflow SimpleHttpOperatorからの応答にアクセスする方法
import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'Loftium',
'depends_on_past': False,
'start_date': datetime(2017, 10, 9),
'email': '[email protected]',
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
dag = DAG('dog_retriever',
schedule_interval='@once',
default_args=default_args)
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
dag=dag)
t2 = SimpleHttpOperator(
task_id='get_breeds',
method='GET',
http_conn_id='http_default',
endpoint='api/breeds/list',
headers={"Content-Type": "application/json"},
dag=dag)
t2.set_upstream(t1)
と呼ばれ、私は単純に、この非常に単純なhttp://dog.ceo APIにいくつかのエンドポイントへの2つのGETリクエストを作ってるんです。目的は、気流を介して取得されたいくつかのデータを処理する方法を学ぶことです
実行が正常に実行されています - 私のコードは、タスクt1とt2のenpointを正常に呼び出します。私が書いたルールset_upstream
に基づいています。
私が理解できないことは、これら2つのタスクのjson応答にアクセスする方法です。それはとても簡単なようですが、私はそれを理解することはできません。 SimpleHtttpOperatorでは、response_checkのパラメータが表示されますが、単にjsonレスポンスを表示、保存、または表示することはありません。
ありがとうございました。
@Chengzhiにありがとうございます。私は今からPythonOperatorを使いたいと思っています。 –