私はカスタムエアフローオペレータを作ったが、このオペレータは入力を受け取り、このオペレータの出力はXCOM上にある。あるオペレータから別のオペレータにデータを渡す方法
私は何を達成したいことは、いくつかの定義された入力をオペレータに呼び出す支店オペレータの内部呼び出し可能なPythonのような出力を解析し、同じ演算子ツリーを呼び出し、別のタスクに解析された出力を渡すことです:
CustomOperator_Task1 = CustomOperator(
data={
'type': 'custom',
'date': '2017-11-12'
},
task_id='CustomOperator_Task1',
dag=dag)
data = {}
def checkOutput(**kwargs):
result = kwargs['ti'].xcom_pull(task_ids='CustomOperator_Task1')
if result.success = True:
data = result.data
return "CustomOperator_Task2"
return "Failure"
BranchOperator_Task = BranchPythonOperator(
task_id='BranchOperator_Task ',
dag=dag,
python_callable=checkOutput,
provide_context=True,
trigger_rule="all_done")
CustomOperator_Task2 = CustomOperator(
data= data,
task_id='CustomOperator_Task2',
dag=dag)
CustomOperator_Task1 >> BranchOperator_Task >> CustomOperator_Task2
タスクCustomOperator_Task2
で、私はBranchOperator_Task
から解析されたデータを渡したいと思います。今は常に空です{}
これを行うにはどうすればよいですか?
カスタム演算子のスニペットを含めることはできますか? 'execute()'メソッドはXComにプッシュする出力値を返しますか、 'xcom_push()'を明示的に呼び出していますか? –
xcom_pushを明示的には行いませんが、オペレータからの返信はxcomに(気流によって)プッシュされ、BranchOperator_Taskのpython_callableを読むことができます。 – whoisthis
カスタム演算子を拡張して呼び出し可能な関数がデータを取得し、関数をデータパラメータに渡してから、関数が返すものを決定できるようにすることで、機能するようになりました。 – whoisthis