2016-12-10 8 views
1

airflowを初めて使用しています。私はダッグを実行しようとしているし、スケジューリングをしたくない。日付がないエアフローの例

私はコマンドライン引数でパイプラインを実行し、現在の出力すべてを無効にしたいと考えています。私は開始日がなく、スケジューリングもタイミングも、リトライロジックもありません。開始するには、一連の関数を順番に実行したいだけです。

ドキュメントには常に日付が含まれています。

airflow test tutorial print_date 2015-06-01 

ダグを実行して、すべての機能を実行し、以前の実行を無視します。私のダッグから日付と日付のロジックをすべて削除するにはどうすればよいですか?

airflow run tutorial_me remove_file 2015-01-04 

それは作品とprint "************ THIS IS WHERE STDOUT GOES"行を出力します。私はそれを実行

""" 
Code that goes along with the Airflow tutorial located at: 
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py 
""" 
import os 
import cPickle 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
from datetime import datetime 


default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2015, 6, 1), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'schedule_interval': '@once' 
} 

dag = DAG('tutorial_me', default_args=default_args) 

def save_file(filenm): 
    with open(filenm, 'wb') as pickle_file: 
     cPickle.dump(['1','2',3], pickle_file) 

def delete_file(filenm): 
    print "************ THIS IS WHERE STDOUT GOES" 
    if os.path.exists(filenm): 
     os.path.remove(filenm) 


# t1, t2 and t3 are examples of tasks created by instantiating operators 
t1 = PythonOperator(
    task_id='save_file', 
    python_callable=save_file, 
    op_kwargs=dict(filenm='__myparamfile__.txt'), 
    dag=dag) 

t2 = PythonOperator(
    task_id='remove_file', 
    python_callable=delete_file, 
    op_kwargs=dict(filenm='__myparamfile__.txt'), 
    dag=dag) 

t1.set_upstream(t2) 

初めて:

私はチュートリアルDAGファイルの修正版を持っています。私はそれを2回目に実行しますが、そうではありません。セカンドラン

cat 2015-01-04T00\:00\:00 
[2016-12-10 11:27:47,158] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags 
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:47,214] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:47,227] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py 
[2016-12-10 11:27:47,234] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py 
[2016-12-10 11:27:48,050] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py 
[2016-12-10 11:27:48,101] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:48,102] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:48,942] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py 
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:48,998] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:49,020] {models.py:1196} INFO - 
-------------------------------------------------------------------------------- 
Starting attempt 1 of 1 
-------------------------------------------------------------------------------- 

[2016-12-10 11:27:49,046] {models.py:1219} INFO - Executing <Task(PythonOperator): remove_file> on 2015-01-04 00:00:00 
[2016-12-10 11:27:49,054] {python_operator.py:67} INFO - Done. Returned value was: None 
[2016-12-10 11:27:55,168] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags 
[2016-12-10 11:27:55,219] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:55,220] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:55,231] {base_executor.py:36} INFO - Adding to queue: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py 
[2016-12-10 11:27:55,236] {sequential_executor.py:26} INFO - Executing command: airflow run tutorial_me remove_file 2015-01-04T00:00:00 --local -sd DAGS_FOLDER/tutorial_01.py 
[2016-12-10 11:27:56,030] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py 
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:56,082] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:56,899] {models.py:154} INFO - Filling up the DagBag from /Users/user_01/airflow/dags/tutorial_01.py 
[2016-12-10 11:27:56,950] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): save_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:56,951] {models.py:1750} WARNING - schedule_interval is used for <Task(PythonOperator): remove_file>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead 
[2016-12-10 11:27:56,967] {models.py:1150} INFO - 
+1

で公式ドキュメントに記述されています。あなたは[mcve]の提案に沿ってあなたの質問に言い返すことができますか? – boardrider

答えて

1

エアフローは、そのDAGの歴史を維持するように設計された後

ログファイルには、次のようになりますので、それがために、データのバッチを処理し、各タスクがために一度だけ実行されることを確認することができます動作しますそのDagRun。

最も簡単なことは、おそらくスケジューラを無視して、実行日が "now"のDagRunを外部からトリガすることです(完全な日付と時刻を含む)。これにより、呼び出す各実行ですべてのタスクが正確に1回実行され、タスクの各実行は以前の実行とは独立して実行されます。 depends_on_past = Falseが必要で、max_active_runsも非常に大きな値にする必要があります。失敗したDagRunは「アクティブ」のままですが、新しい呼び出しを妨げないようにします。

1

私はあなたの要求がairflow issue #198に似ていると信じています:

「我々は唯一のタスクインスタンスの実行のシリーズの最新作を実行し、スキップとして他人をマークする必要がある場合については、例えば、我々はことがあります。毎日DBスナップショットを実行するジョブがあります.DAGが5日間停止してから一時停止されていない場合は、最新のものだけをすべて実行したくないので、タスクスケジューリングにcron機能を提供しますこれはETLに関連していません」

この問題は、LatestOnlyOperator featu文書番号hereに記載されています。

使い方はあなたが少しあまりにも多くのデータを与えられ、そして地元の人々を離れて怖がっている可能性がhttps://airflow.apache.org/concepts.html#latest-run-only

from airflow.operators.latest_only_operator import LatestOnlyOperator 

dag = DAG(
    dag_id='latest_only_with_trigger', 
    schedule_interval=dt.timedelta(hours=4), 
    start_date=dt.datetime(2016, 9, 20), 
) 

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) 
関連する問題