私は最終的にこれを行う方法を見つけました。あなたはdefault_args
class Foo:
@staticmethod
def get_default_args():
"""
Return default args
:return: default_args
"""
default_args = {
'on_failure_callback': Foo.on_failure_callback
}
return default_args
@staticmethod
def on_failure_callback(context):
"""
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
operator = SlackAPIPostOperator(
task_id='failure',
text=str(context['task_instance']),
token=Variable.get("slack_access_token"),
channel=Variable.get("slack_channel")
)
return operator.execute(context=context)
興味深い質問としてあなたon_failure_callbackを渡すことができ
、on_failure_callbackがBaseOperatorで定義されていた、私は考えることができる唯一の方法は、あなた自身のオペレータを作成して、BaseOperatorから継承し、その後、あなたのon_failure_callbackを渡すことです() そこ。他の人の考え方を見たいと思っています – Chengzhi
あなたのご意見ありがとうございますが、私はBaseOperatorのように何かを基本として変更することに自信がありませんでした。私はそれを各オペレータに手動で追加することを好むが、BaseOperatorの更新を逃さないようにすることが望ましい(メンテナンスは少ない) –