実行中のステップに応じて気流内のSubDAGを持っています(通常、約2時間です。 1.7.1.3では、このステップは一貫してAIRFLOW-736を引き起こし、すべてのステップが成功したときにSubDAGが「実行中」の状態で停止します。 SubDAGOperatorを手動でデータベース内で(実行するのではなく)マークすることで、SubDAGの後にステップがないので、これを回避することができました。気流 - 1時間後に失敗したとマークされたSubDag内の長時間実行中のタスク
私たちは以下のようにして、今すぐアップグレードをエアフロー1.8.1をテストしている:
- 、PIPを経由して私たちのスケジューラと労働者
- をダウンShuting気流をアンインストールし、Apache-エアフローをインストール(バージョン1.8.1 )
- システムOで気流スケジューラと労働者
を実行
エアフローデータベースのtask_instanceテーブルで状態が「失敗」していることが確認されていますので、タスクが失敗した場合に何が失敗するかを知りたいと思います。自身がまだ実行されている
することはここでは問題をトリガサンプルDAGです:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
今日、同じ問題、1つの長い実行中のタスクを持つサブダグを実行しました。少し時間がたってから、エラーメッセージが表示されました。面白いことに、スケジューラは、空気の流れのブロックされたリソース外れによって失敗したタスクを再開しようとしました。元のタスクは引き続き実行され、正しく終了しました。タスクが終了する前に、エアフローによってサブダグが失敗したとマークされました。 –
どのエグゼキュータを使用していますか。それはセロリ+レディスですか? –