2017-10-04 12 views
0

私はサードパーティのサービスでジョブを起動し、そのジョブの進行状況を監視するAirflowオペレータを持っています。エアフローの労働者が(通常は原因コードのデプロイに)再起動した場合のコードでは、このタスクのインスタンスが実行されているとき、私はすべきタスクの再開たとえばたい再起動されたAirflow TaskInstanceの状態の回復

def execute(self, context): 
    external_id = start_external_job() 
    wait_until_external_job_completes(external_id) 

のようなルックスを実行します以前のサービスが中止された場所(第三者サービスのジョブを監視する)。後で同じタスクインスタンスを実行しても、そのサードパーティのジョブIDを共有する方法はありますか?

メソッドを実行高めの例は次のようになります。

def execute(self, context): 
    external_id = load_external_id_for_task_instance() 
    if external_id is None: 
     external_id = start_external_job(args) 
     persist_external_id_for_task_instance(external_id) 

    wait_until_external_job_completes(external_id) 

そして私はload_external_id_for_task_instancepersist_external_id_for_task_instanceを実装する必要があります。

答えて

0

XComsSensorsの2つのタスクに分割することをお勧めします。

あなたが仕事を提出し、XCOMにIDを保存し1つのオペレータ持つことができますので、

class JobCompleteSensor(BaseSensor): 

    @apply_defaults 
    def __init__(self, submit_task_id, *args, **kwargs): 
     self.submit_task_id = submit_task_id # so we know where to fetch XCom value from 
     super(JobCompleteSensor, self).__init__(*args, **kwargs) 

    def poke(self, context): 
     external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id) 
     return check_if_external_job_is_complete(external_id): 

:その後、完了するまでXCOMと世論調査からIDをフェッチセンサーを

class SubmitJobOperator(BaseOperator): 

    def execute(self, context): 
     external_id = start_external_job() 
     return external_id # return value will be stored in XCom 

をDAGは次のようになります。

submit_job = SubmitJobOperator(
    dag=dag, 
    task_id='submit_job', 
) 

wait_for_job_to_complete = JobCompleteSensor(
    dag=dag, 
    task_id='wait_for_job_to_complete', 
    submit_task_id=submit_job.task_id, 
) 

submit_job >> wait_for_job_to_complete 

XComsは、 ensorは常に以前に提出されたexternal_idを見つけることができます。

+0

ありがとうございます!私は実装しましたが、これは魅力的なように機能しています! –

関連する問題