2017-09-12 13 views
0

エアフローが初めてで、最初のDAGが作成されました。ここに私のDAGコードがあります。私はDAGを開始し、その後は1日に1回実行します。エアフローDAGがスケジュールされていない

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

dag = DAG(
    'alamode', default_args=default_args, schedule_interval=timedelta(1)) 

create_command = "/home/ubuntu/scripts/makedir.sh " 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command=create_command, 
    dag=dag) 

run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh " 
# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id='web_scrawl', 
    bash_command=run_spiders, 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1) 

エアフローによってDAGが選択されていません。私はログをチェックし、ここにそれが何を言っているのです。

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue 
[2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py 
[2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode 
/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 
    'strategies for improved performance.' % expr) 
[2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat 
[2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105 
[2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds 

正確には間違っていますか?私はschedule_intervalをschedule_interval = timedelta(分= 1)に変更して、すぐに開始するかどうかを確認しようとしましたが、まだ使用していません。私はAirflow UIでは期待通りにDAGの下でタスクを見ることができますが、スケジュールステータスは「ステータスなし」です。ここで私を助けてください。私はSTART_DATEのための非常に古い日付を使用してschedule_interval =はtimedelta(分= 10)

1):

+0

あなたはDAGオンがありますUIをオンに切り替えますか? – Chengzhi

+0

はい、ボタンはオンです。それでも、それはピックアップされていません。 – Anju

+0

気流作業員とエアフロースケジューラを呼び出しましたか? –

答えて

0

この問題は、以下の手順で解決されました。また、datetime.now()の代わりに実際の日付を使用しました。
2)DAG引数にcatchup = Trueを追加しました。
3)環境変数をエクスポートとして設定します。AIRFLOW_HOME = pwd/airflow_home。
4)削除済みairflow.db
5)新しいコードをDAGSフォルダに移動しました
6) 'airflow initdb'コマンドを実行して、再度DBを作成します。
7)UI
8を通して私のDAGの「オン」スイッチを回した)ここで、コマンド「気流スケジューラ」

は現在動作するコードである蘭:

from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 9, 12), 
    'email': ['[email protected]'], 
    'retries': 0, 
    'retry_delay': timedelta(minutes=15) 
} 

dag = DAG(
    'alamode', catchup=False, default_args=default_args, schedule_interval="@daily") 

# t1 is the task which will invoke the directory creation shell script 
t1 = BashOperator(
    task_id='create_directory', 
    bash_command='/home/ubuntu/scripts/makedir.sh ', 
    dag=dag) 


# t2 is the task which will invoke the spiders 
t2 = BashOperator(
    task_id= 'web_crawl', 
    bash_command='/home/ubuntu/scripts/crawl_spiders.sh ', 
    dag=dag) 

# To set dependency between tasks. 't1' should run before t2 
t2.set_upstream(t1)