2017-06-26 14 views
2

おはようございます。Apache Airflow - 完了時にDAGを再実行する(ファイルセンサー)

私はネットワークフォルダ
  • プロセスをヒットするファイルのファイル
  • アーカイブファイル使用
  • のセットアップにDAGすぎ

    1. ウォッチ/感覚をしようとしている

      チュートリアルオンラインとstackoverflow私は目標を達成する次のDAGとオペレータを考え出すことができましたが、私はDAGを再スケジューリングして、完了時に再実行してwatcを開始するようにします他のファイルのヒンジ/センシング。

      変数max_active_runs:1を設定しようとしましたが、schedule_interval: timedelta(seconds=5)がこの場合はDAGが再スケジュールされますが、キューイングタスクが開始され、ファイルがロックされます。

      archive_taskの後にどのようにDAGを再実行することができますか?

      おかげ

      DAG CODE

      from airflow import DAG 
      from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator 
      from datetime import datetime, timedelta 
      from airflow.models import Variable 
      
      default_args = { 
          'owner': 'glsam', 
          'depends_on_past': False, 
          'start_date': datetime.now(), 
          'provide_context': True, 
          'retries': 100, 
          'retry_delay': timedelta(seconds=30), 
          'max_active_runs': 1, 
          'schedule_interval': timedelta(seconds=5), 
      } 
      
      dag = DAG('test_sensing_for_a_file', default_args=default_args) 
      
      filepath = Variable.get("soucePath_Test") 
      filepattern = Variable.get("filePattern_Test") 
      archivepath = Variable.get("archivePath_Test") 
      
      sensor_task = OmegaFileSensor(
          task_id='file_sensor_task', 
          filepath=filepath, 
          filepattern=filepattern, 
          poke_interval=3, 
          dag=dag) 
      
      
      def process_file(**context): 
          file_to_process = context['task_instance'].xcom_pull(
           key='file_name', task_ids='file_sensor_task') 
          file = open(filepath + file_to_process, 'w') 
          file.write('This is a test\n') 
          file.write('of processing the file') 
          file.close() 
      
      
      proccess_task = PythonOperator(
          task_id='process_the_file', python_callable=process_file, dag=dag) 
      
      archive_task = ArchiveFileOperator(
          task_id='archive_file', 
          filepath=filepath, 
          archivepath=archivepath, 
          dag=dag) 
      
      sensor_task >> proccess_task >> archive_task 
      

      FILEセンサOPERATOR

      import os 
          import re 
      
          from datetime import datetime 
          from airflow.models import BaseOperator 
          from airflow.plugins_manager import AirflowPlugin 
          from airflow.utils.decorators import apply_defaults 
          from airflow.operators.sensors import BaseSensorOperator 
      
      
          class ArchiveFileOperator(BaseOperator): 
           @apply_defaults 
           def __init__(self, filepath, archivepath, *args, **kwargs): 
            super(ArchiveFileOperator, self).__init__(*args, **kwargs) 
            self.filepath = filepath 
            self.archivepath = archivepath 
      
           def execute(self, context): 
            file_name = context['task_instance'].xcom_pull(
             'file_sensor_task', key='file_name') 
            os.rename(self.filepath + file_name, self.archivepath + file_name) 
      
      
          class OmegaFileSensor(BaseSensorOperator): 
           @apply_defaults 
           def __init__(self, filepath, filepattern, *args, **kwargs): 
            super(OmegaFileSensor, self).__init__(*args, **kwargs) 
            self.filepath = filepath 
            self.filepattern = filepattern 
      
           def poke(self, context): 
            full_path = self.filepath 
            file_pattern = re.compile(self.filepattern) 
      
            directory = os.listdir(full_path) 
      
            for files in directory: 
             if not re.match(file_pattern, files): 
              return False 
             else: 
              context['task_instance'].xcom_push('file_name', files) 
              return True 
      
      
          class OmegaPlugin(AirflowPlugin): 
           name = "omega_plugin" 
           operators = [OmegaFileSensor, ArchiveFileOperator] 
      

    答えて

    1

    設定schedule_interval=Noneと前の1が完了した時点で次の実行を起動するBashOperatorからairflow trigger_dagコマンドを使用します。

    trigger_next = BashOperator(task_id="trigger_next", 
          bash_command="airflow trigger_dag 'your_dag_id'", dag=dag) 
    
    sensor_task >> proccess_task >> archive_task >> trigger_next 
    

    あなたは同じairflow trigger_dagコマンドを使って手動であなたの最初の実行を開始することができ、その後trigger_nextタスクが自動的に次のものをトリガします。私たちはこれを数ヶ月間、生産で使用しています。

    +0

    偉大な方法を働いたありがとうございました。 –

    4

    ドミトリス法は完全に機能しました。

    私もschedule_interval=Noneを設定して、TriggerDagRunOperatorを使用して私の読書で見つかっが均等にも

    trigger = TriggerDagRunOperator(
        task_id='trigger_dag_RBCPV99_rerun', 
        trigger_dag_id="RBCPV99_v2", 
        dag=dag) 
    
    sensor_task >> proccess_task >> archive_task >> trigger 
    
    関連する問題