2017-03-08 6 views
3

非常に頻繁に、私がダウンロードしているファイルのファイル名に日付があります。エアフローでパラメータを使用する例?

csat_surveys_2017_03_05.csv 
03062017_roster.csv 

私のコードはこれを個別に扱います。

  • は、ファイル名を追加し、私が処理した各ファイルについて
  • (一部の日付が現在の日付までの範囲)が存在しなければならないと予想日付で(ファイル名をスライスに基づいて)処理されたファイルの一覧で日付を比較しますテーブルに追加されていない新しいファイルのみを処理します。

このロジックをコーディングする必要性を置き換えるために気流スケジュールの日付を使用することはできますか?毎日、私の仕事は走る予定です。私はその予定日(マイナス1日、おそらく)をとり、読み込むファイル名の一部として(パンダで)渡すパラメータとしてその値を使用します。もしそうなら、私はテンプレートとして使うことができる明確な例を見ていただけますか?

ファイルが欠落している、または数日遅れている場合は、私にはカバーされます(私はタスクが失敗してしまい、それが成功するまで、クライアントに問題を提起する)

答えて

0

私は「はい」と言いますが、execution_dateを使用するのがおそらくベストプラクティスです。

アクセスするには、テンプレートフィールドが必要です。いくつかのデフォルトの演算子は、すでにそれらを持っている、またはあなたは、このようなものになりますあなた自身のオペレータ、作成する必要があります

my_task = MyOperator(
    task_id='t1', 
    filename='prefix_{{ ds }}_suffix') 

dsがある:あなたのDAGで

をとして、あなたは仕事を持っていると思いますexecution_dateパラメータに日付の文字列表現としてアクセスするためのエアフローマクロ。

そして、あなたのMyOperatorは次のようになります。

class MyOperator(BaseOperator): 
    template_fields = ('filename') 

    def __init__(self, filename) 
     self.filename = filename 

    def execute(self, context): 
     download_file(self.filename) 
     do_other_stuff() 
あなたはマクロセクションのタスクをパラメータ化する方法についての詳細を見つけることができます

https://airflow.incubator.apache.org/code.html#macros

関連する問題