2017-11-27 38 views
0

気流テンプレート内でJSON文字列を解析することはできますか?気流テンプレート内のjson文字列を解析する方法

REST API経由でジョブを監視するHttpSensorを持っていますが、ジョブIDはというxcom_pushのアップストリームタスクの応答にあります。私は、次のような何かをしたいと思います

が、しかし、このコードは、エラーにjinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1", 
    task_id="job", 
    endpoint="some_url", 
    method='POST', 
    data=json.dumps({ "foo": "bar" }), 
    xcom_push=True, 
    dag=dag, 
) 

t2 = HttpSensor(
    http_conn_id="s1", 
    task_id="finish_job", 
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}", 
    response_check=lambda response: True if response.json().state == "complete" else False, 
    poke_interval=5, 
    dag=dag 
) 

t2.set_upstream(t1) 

答えて

0

を与えるあなたは、JSONを解析するために、パラメータuser_defined_filtersであなたのDAGにカスタム神社フィルタを追加することができます。

ジンジャーテンプレートに公開されるフィルタの辞書 。たとえば、 dict(hello=lambda name: 'Hello %s' % name)をこの引数に渡すと、 このDAGに関連するすべてのジンジャーテンプレートで を{{ 'world' | hello }}にすることができます。

dag = DAG(
    ... 
    user_defined_filters={'fromjson': lambda s: json.loads(s)}, 
) 

t1 = SimpleHttpOperator(
    task_id='job', 
    xcom_push=True, 
    ... 
) 

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}', 
    ... 
) 

しかし、それだけであなただけの直接テンプレートに{{ti.xcom_pull("job")["jobId"]を参照できるようにreturning前にJSONを解析し、独自のカスタムJsonHttpOperatorplugin(またはSimpleHttpOperatorにフラグを追加)を書くためにきれいかもしれません。

class JsonHttpOperator(SimpleHttpOperator): 

    def execute(self, context): 
     text = super(JsonHttpOperator, self).execute(context) 
     return json.loads(text) 
1

また、実行してテンプレートにjsonモジュールを追加することも可能であるとJSONは、テンプレート内の使用のために利用できるようになります。ただし、ダニエル氏のようにプラグインを作成する方がいいでしょう。その後、

dag = DAG(
    'dagname', 
    default_args=default_args, 
    schedule_interval="@once", 
    user_defined_macros={ 
     'json': json 
    } 
) 

finish_job = HttpSensor(
    task_id="finish_job", 
    endpoint="kue/job/{{ json.loads(ti.xcom_pull('job'))['jobId'] }}", 
    response_check=lambda response: True if response.json()['state'] == "complete" else False, 
    poke_interval=5, 
    dag=dag 
) 
関連する問題