2017-10-10 3 views
0

私は気流を学んでおり、簡単な質問があります。以下は私のDAGは、空気の流れをテストする手段としてdog_retrieverAirflow SimpleHttpOperatorからの応答にアクセスする方法

import airflow 
from airflow import DAG 
from airflow.operators.http_operator import SimpleHttpOperator 
from airflow.operators.sensors import HttpSensor 
from datetime import datetime, timedelta 
import json 



default_args = { 
    'owner': 'Loftium', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 10, 9), 
    'email': '[email protected]', 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=3), 
} 

dag = DAG('dog_retriever', 
    schedule_interval='@once', 
    default_args=default_args) 

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2 = SimpleHttpOperator(
    task_id='get_breeds', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breeds/list', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2.set_upstream(t1) 

と呼ばれ、私は単純に、この非常に単純なhttp://dog.ceo APIにいくつかのエンドポイントへの2つのGETリクエストを作ってるんです。目的は、気流を介して取得されたいくつかのデータを処理する方法を学ぶことです

実行が正常に実行されています - 私のコードは、タスクt1とt2のenpointを正常に呼び出します。私が書いたルールset_upstreamに基づいています。

私が理解できないことは、これら2つのタスクのjson応答にアクセスする方法です。それはとても簡単なようですが、私はそれを理解することはできません。 SimpleHtttpOperatorでは、response_checkのパラメータが表示されますが、単にjsonレスポンスを表示、保存、または表示することはありません。

ありがとうございました。

答えて

2

これはSimpleHttpOperatorであり、実際のjsonはXCOMにプッシュされており、そこから取得できます。ここでは、そのアクションのためのコードの行は次のとおりです。https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

あなたはあなたの最初のT1は、以下になりますので、xcom_push=Trueに設定されているに必要なもの

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    xcom_push=True, 
    dag=dag) 

あなたはreturn valueですべてのJSONを見つけることができるはずですXCOMでは、XCOMの詳細はhttps://airflow.incubator.apache.org/concepts.html#xcoms

+0

@Chengzhiにありがとうございます。私は今からPythonOperatorを使いたいと思っています。 –

関連する問題