私はサードパーティのサービスでジョブを起動し、そのジョブの進行状況を監視するAirflowオペレータを持っています。エアフローの労働者が(通常は原因コードのデプロイに)再起動した場合のコードでは、このタスクのインスタンスが実行されているとき、私はすべきタスクの再開たとえばたい再起動されたAirflow TaskInstanceの状態の回復
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
のようなルックスを実行します以前のサービスが中止された場所(第三者サービスのジョブを監視する)。後で同じタスクインスタンスを実行しても、そのサードパーティのジョブIDを共有する方法はありますか?
メソッドを実行高めの例は次のようになります。
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
そして私はload_external_id_for_task_instance
とpersist_external_id_for_task_instance
を実装する必要があります。
ありがとうございます!私は実装しましたが、これは魅力的なように機能しています! –