2017-07-25 8 views
4

私のDAGファイルでは、失敗した場合にスラックを投稿するon_failure_callback()関数を定義しています。エアフローのデフォルトon_failure_callback

私はDAG内の各演算子のために指定した場合、それはうまく機能:on_failure_callback = on_failure_callback()

は、(例えば、または私のDAGオブジェクト経由でdefault_args経由)の全てへの発送を自動化する方法はあります私の演算子?

+0

興味深い質問としてあなたon_failure_callbackを渡すことができ

、on_failure_callbackがBaseOperatorで定義されていた、私は考えることができる唯一の方法は、あなた自身のオペレータを作成して、BaseOperatorから継承し、その後、あなたのon_failure_callbackを渡すことです() そこ。他の人の考え方を見たいと思っています – Chengzhi

+0

あなたのご意見ありがとうございますが、私はBaseOperatorのように何かを基本として変更することに自信がありませんでした。私はそれを各オペレータに手動で追加することを好むが、BaseOperatorの更新を逃さないようにすることが望ましい(メンテナンスは少ない) –

答えて

5

私は最終的にこれを行う方法を見つけました。あなたはdefault_args

class Foo: 
    @staticmethod 
    def get_default_args(): 
     """ 
     Return default args 
     :return: default_args 
     """ 

     default_args = { 
      'on_failure_callback': Foo.on_failure_callback 
     } 

     return default_args 

    @staticmethod 
    def on_failure_callback(context): 
    """ 
    Define the callback to post on Slack if a failure is detected in the Workflow 
    :return: operator.execute 
    """ 

    operator = SlackAPIPostOperator(
     task_id='failure', 
     text=str(context['task_instance']), 
     token=Variable.get("slack_access_token"), 
     channel=Variable.get("slack_channel") 
    ) 

    return operator.execute(context=context) 
関連する問題