2017-12-11 9 views
1

私はカスタムエアフローオペレータを作ったが、このオペレータは入力を受け取り、このオペレータの出力はXCOM上にある。あるオペレータから別のオペレータにデータを渡す方法

私は何を達成したいことは、いくつかの定義された入力をオペレータに呼び出す支店オペレータの内部呼び出し可能なPythonのような出力を解析し、同じ演算子ツリーを呼び出し、別のタスクに解析された出力を渡すことです:

CustomOperator_Task1 = CustomOperator(
    data={ 
     'type': 'custom', 
     'date': '2017-11-12' 
    }, 
    task_id='CustomOperator_Task1', 
    dag=dag) 

data = {} 
def checkOutput(**kwargs): 
    result = kwargs['ti'].xcom_pull(task_ids='CustomOperator_Task1') 

    if result.success = True: 
     data = result.data 
     return "CustomOperator_Task2" 
    return "Failure" 

BranchOperator_Task = BranchPythonOperator(
    task_id='BranchOperator_Task ', 
    dag=dag, 
    python_callable=checkOutput, 
    provide_context=True, 
    trigger_rule="all_done") 

CustomOperator_Task2 = CustomOperator(
    data= data, 
    task_id='CustomOperator_Task2', 
    dag=dag) 

CustomOperator_Task1 >> BranchOperator_Task >> CustomOperator_Task2 

タスクCustomOperator_Task2で、私はBranchOperator_Taskから解析されたデータを渡したいと思います。今は常に空です{}

これを行うにはどうすればよいですか?

+0

カスタム演算子のスニペットを含めることはできますか? 'execute()'メソッドはXComにプッシュする出力値を返しますか、 'xcom_push()'を明示的に呼び出していますか? –

+0

xcom_pushを明示的には行いませんが、オペレータからの返信はxcomに(気流によって)プッシュされ、BranchOperator_Taskのpython_callableを読むことができます。 – whoisthis

+0

カスタム演算子を拡張して呼び出し可能な関数がデータを取得し、関数をデータパラメータに渡してから、関数が返すものを決定できるようにすることで、機能するようになりました。 – whoisthis

答えて

0

コメントに示されているように、カスタム演算子の戻り値はNoneであるため、xcom_pullは空であると予想されます。 気流のデフォルト動作が時間とともに変化する可能性があるので、xcom_pushを明示的に使用してください。

+0

カスタムオペレータの戻り値は 'なし'ではありません:-('checkOutput'で印刷したときのオブジェクトが正しく表示されます – whoisthis

+0

カスタムオペレータに関数を渡すことがわかりました(カスタムオペレータを変更しました)。今はxcomから読んで返していますが、xcomへの明示的なプッシュを行うことをお勧めします。 – whoisthis

0

問題が表示されます。あなたのような変数をdataに設定することは、気流の仕組みのせいで動作しません。全く異なるプロセスが次のタスクを実行するため、dataのコンテキストが設定されません。

代わりに、BranchOperator_Taskは、解析された出力を別のXComにプッシュする必要があります。したがって、CustomOperator_Task2は明示的にフェッチできます。

def checkOutput(**kwargs): 
    ti = kwargs['ti'] 
    result = ti.xcom_pull(task_ids='CustomOperator_Task1') 

    if result.success: 
     ti.xcom_push(key='data', value=data) 
     return "CustomOperator_Task2" 
    return "Failure" 

BranchOperator_Task = BranchPythonOperator(
    ...) 

CustomOperator_Task2 = CustomOperator(
    data_xcom_task_id=BranchOperator_Task.task_id, 
    data_xcom_key='data', 
    task_id='CustomOperator_Task2', 
    dag=dag) 

オペレータが次のように表示されることがあります。

class CustomOperator(BaseOperator): 

    @apply_defaults 
    def __init__(self, data_xcom_task_id, data_xcom_key, *args, **kwargs): 
     self.data_xcom_task_id = data_xcom_task_id 
     self.data_xcom_key = data_xcom_key 
    def execute(self, context): 
     data = context['ti'].xcom_pull(task_ids=self.data_xcom_task_id, key=self.data_xcom_key) 
     ... 

パラメータをハードコードするだけの場合は、パラメータを入力する必要はありません。ユースケースによって異なります。