FTPサーバー上のファイルをチェックするダグがあります(エアフローは別のサーバーで実行されます)。ファイルが存在する場合、ファイルはS3に移動します(ここでアーカイブします)。そこから、ファイル名がSparkサブミットジョブに渡されます。スパークジョブは、S3(異なるサーバー上のスパーククラスタ)を介してファイルを処理します。私は複数のダグを持つ必要があるかどうかはわかりませんが、ここではその流れがあります。私が探しているのは、S3バケットにファイルが存在する場合にのみスパークジョブを実行することです。エアフローダグの途中でタスクを途中で終了する方法はありますか?
S3センサーを使用しようとしましたが、タイムアウト基準を満たした後にタイムアウトに失敗するため、ダグ全体が失敗に設定されます。
check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done
私は唯一つまたは複数のファイルをS3に移動されたONLY FTPチェックを行うスクリプトの後、すべてを実行します。
ファイルが見つからない場合はどうなりますか?エラーコードが出ても終了しませんか?したがって、誰かが仕事を再トリガーする必要がありますか? – luckytaxi
最初のDAG(S3SensorとTriggerDagRunOperatorという2つのタスクがあります)は、5分ごとに実行するようにスケジュールできます。つまり、センサーは5分ごとに実行され、ファイルが検出された場合は2番目のDAGがトリガーされます。それ以外の場合は、何もせずに5分後に再実行します。エラーコードで終了するかどうかは関係ありません(最初のDAGではdepends_on_pastをtrueに設定しないでください)。 – Him