2016-08-26 22 views
0

私は気流を言う必要があり、cronを実行すると非常に混乱します。私はちょうど今から5分ごとにcronを開始したい。複雑なダグを作成するのは簡単です。 docに基づいてcronを実行するロジックを解明することはできません。気流とcron - スケジューリングは5分を実行しても機能しません

以下のコードを実行するとどうなりますか?これまでに何度も印刷が実行されています。

1 2016-08-26 17:17:01.584360 
2 2016-08-26 17:17:08.124035 
3 2016-08-26 17:17:15.293874 
1 2016-08-26 17:17:24.100623 
2 2016-08-26 17:17:31.637739 
3 2016-08-26 17:17:37.919901 
1 2016-08-26 17:17:45.255641 
2 2016-08-26 17:17:52.859954 
3 2016-08-26 17:17:59.048536 
1 2016-08-26 17:18:06.175670 
2 2016-08-26 17:18:12.759000 
3 2016-08-26 17:18:20.112758 
1 2016-08-26 17:18:26.909130 
2 2016-08-26 17:18:34.396926 
on and on....WOWEE 

以下は私のコードです。非常に少なくとも

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators import PythonOperator 
from airflow.operators import TriggerDagRunOperator 
from airflow.operators import DummyOperator 
from datetime import datetime, timedelta 

def checkfornewdata(): 
    f = open('/tmp/first_text.log','a') 
    f.write('1 %s\n' % datetime.now()) 
    f.close() 
    return True 

def fetchdata(): 
    f = open('/tmp/first_text.log','a') 
    f.write('2 %s\n' % datetime.now()) 
    f.close() 
    return True 

def uploadtoes(): 
    f = open('/tmp/first_text.log','a') 
    f.write('3 %s\n' % datetime.now()) 
    f.close() 
    return True 


mytime = datetime.combine(datetime.now()-timedelta(minutes=5), 
            datetime.min.time()) 


default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    "start_date": mytime, 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': True, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

# */5 * * * * 
dag = DAG('first_test', schedule_interval="*/5 * * * *", default_args=default_args) 

node_0 = PythonOperator(
    task_id='isnewdata', 
    provide_context=False, 
    python_callable=checkfornewdata, 
    dag=dag) 


node_0_1 = PythonOperator(
    task_id='fetchdata', 
    provide_context=False, 
    python_callable=fetchdata, 
    dag=dag) 

node_0_1_2 = PythonOperator(
    task_id='uploadtoes', 
    provide_context=False, 
    python_callable= uploadtoes, 
    dag=dag) 


node_0_1.set_upstream(node_0) 
node_0_1_2.set_upstream(node_0_1) 
+0

気流が死んでいるようです。保守担当者は応答しません。 – Tampa

+0

気流は決して死んだプロジェクトではありません。 https://github.com/apache/incubator-airflow/graphs/contributors – russellpierce

答えて

0

、あなたは非動的時間を使用するように"start_date": mytimeを変更したいと思います。特にthe FAQを参照してください。

関連する問題