おはようございます。Apache Airflow - 完了時にDAGを再実行する(ファイルセンサー)
私はネットワークフォルダのセットアップにDAGすぎ
- ウォッチ/感覚をしようとしている
チュートリアルオンラインとstackoverflow私は目標を達成する次のDAGとオペレータを考え出すことができましたが、私はDAGを再スケジューリングして、完了時に再実行してwatcを開始するようにします他のファイルのヒンジ/センシング。
変数
max_active_runs:1
を設定しようとしましたが、schedule_interval: timedelta(seconds=5)
がこの場合はDAGが再スケジュールされますが、キューイングタスクが開始され、ファイルがロックされます。archive_taskの後にどのようにDAGを再実行することができますか?
おかげ
DAG CODE
from airflow import DAG from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator from datetime import datetime, timedelta from airflow.models import Variable default_args = { 'owner': 'glsam', 'depends_on_past': False, 'start_date': datetime.now(), 'provide_context': True, 'retries': 100, 'retry_delay': timedelta(seconds=30), 'max_active_runs': 1, 'schedule_interval': timedelta(seconds=5), } dag = DAG('test_sensing_for_a_file', default_args=default_args) filepath = Variable.get("soucePath_Test") filepattern = Variable.get("filePattern_Test") archivepath = Variable.get("archivePath_Test") sensor_task = OmegaFileSensor( task_id='file_sensor_task', filepath=filepath, filepattern=filepattern, poke_interval=3, dag=dag) def process_file(**context): file_to_process = context['task_instance'].xcom_pull( key='file_name', task_ids='file_sensor_task') file = open(filepath + file_to_process, 'w') file.write('This is a test\n') file.write('of processing the file') file.close() proccess_task = PythonOperator( task_id='process_the_file', python_callable=process_file, dag=dag) archive_task = ArchiveFileOperator( task_id='archive_file', filepath=filepath, archivepath=archivepath, dag=dag) sensor_task >> proccess_task >> archive_task
FILEセンサOPERATOR
import os import re from datetime import datetime from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from airflow.operators.sensors import BaseSensorOperator class ArchiveFileOperator(BaseOperator): @apply_defaults def __init__(self, filepath, archivepath, *args, **kwargs): super(ArchiveFileOperator, self).__init__(*args, **kwargs) self.filepath = filepath self.archivepath = archivepath def execute(self, context): file_name = context['task_instance'].xcom_pull( 'file_sensor_task', key='file_name') os.rename(self.filepath + file_name, self.archivepath + file_name) class OmegaFileSensor(BaseSensorOperator): @apply_defaults def __init__(self, filepath, filepattern, *args, **kwargs): super(OmegaFileSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.filepattern = filepattern def poke(self, context): full_path = self.filepath file_pattern = re.compile(self.filepattern) directory = os.listdir(full_path) for files in directory: if not re.match(file_pattern, files): return False else: context['task_instance'].xcom_push('file_name', files) return True class OmegaPlugin(AirflowPlugin): name = "omega_plugin" operators = [OmegaFileSensor, ArchiveFileOperator]
偉大な方法を働いたありがとうございました。 –