2017-03-28 4 views
0

私は比較的新しいairflowプロジェクトを使用しています。私は書かれたDAGをたくさん持っています。今、バグ報告サービスを統合して、DAGのいずれかのコードが例外を発生させた場合、情報が特定のAPIに送信されるようにします。私は各DAGのon_failure_callbackにAPIコールを置くことができますが、一度だけ実行する必要があるbug_reporter.init(bug_reporter_token)のような初期化行を実行する必要があります。すべてのエアフローDAGの前にコードを実行

コードを初期化するためのエアフローの場所はありますか?今はすべてのDAG定義ファイルの始めにバグトラッカーを初期化しています。これは重複しているようですが、DAGが定義される前に実行されるファイルを書き込む場所を見つけることができません。私は約pluginsについて読んでみましたが、そこにはないようです。代わりに、DAGのあなたのDAG定義ファイルで

答えて

0

、独自のサブクラスを使用します。

from airflow.utils.decorators import apply_defaults 
import bug_reporter 

class DAGWithBugReporter(DAG): 
    @apply_defaults 
    def __init__(
     self, 
     bug_reporter_token, 
     *args, **kwargs): 

     super(DAGWithBugReporter, self).__init__(*args, **kwargs) 
     bug_reporter.init(bug_reporter_token) 

その後、あなたのDAGの定義で:

dag = DAGWithBugReporter(
    dag_id='my_dag', 
    schedule_interval=None, 
    start_date=datetime(2017, 2, 26), 
    bug_reporter_token=my_token_from_somewhere 
) 


t1 = PythonOperator(
    task_id='t1', 
    provide_context=True, 
    python_callable=my_callable, 
    xcom_push=True, 
    dag=dag) 
関連する問題