2017-06-24 14 views
1

FTPサーバー上のファイルをチェックするダグがあります(エアフローは別のサーバーで実行されます)。ファイルが存在する場合、ファイルはS3に移動します(ここでアーカイブします)。そこから、ファイル名がSparkサブミットジョブに渡されます。スパークジョブは、S3(異なるサーバー上のスパーククラスタ)を介してファイルを処理します。私は複数のダグを持つ必要があるかどうかはわかりませんが、ここではその流れがあります。私が探しているのは、S3バケットにファイルが存在する場合にのみスパークジョブを実行することです。エアフローダグの途中でタスクを途中で終了する方法はありますか?

S3センサーを使用しようとしましたが、タイムアウト基準を満たした後にタイムアウトに失敗するため、ダグ全体が失敗に設定されます。

check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done 

私は唯一つまたは複数のファイルをS3に移動されたONLY FTPチェックを行うスクリプトの後、すべてを実行します。

答えて

1

2つの異なるDAGを持つことができます。 1人はS3センサーしか持っておらず、5分ごとに走り続けます。ファイルを検出すると、2番目のDAGがトリガーされます。 2番目のDAGはファイルをS3に提出し、完了したらアーカイブします。最初のDAGでTriggerDagRunOperatorを使用してトリガすることができます。

+0

ファイルが見つからない場合はどうなりますか?エラーコードが出ても終了しませんか?したがって、誰かが仕事を再トリガーする必要がありますか? – luckytaxi

+0

最初のDAG(S3SensorとTriggerDagRunOperatorという2つのタスクがあります)は、5分ごとに実行するようにスケジュールできます。つまり、センサーは5分ごとに実行され、ファイルが検出された場合は2番目のDAGがトリガーされます。それ以外の場合は、何もせずに5分後に再実行します。エラーコードで終了するかどうかは関係ありません(最初のDAGではdepends_on_pastをtrueに設定しないでください)。 – Him

0

彼が与えた答えはうまくいくでしょう。 もう1つのオプションは、Sensorが持つ "soft_fail"パラメータ(BaseSensorOperatorのパラメータ)を使用しています。このパラメータをTrueに設定すると、タスクが失敗するのではなく、タスクがスキップされ、ブランチ内のすべての後続タスクもスキップされます。

詳細はairflow codeを参照してください。

関連する問題