0
@daily間隔のスケジューリングとすべてのワークフローのタスクIDを持つDAGが作成されました。しかし、それは例外として実行されていません。このようにすることは可能ですか?特定のDAGの動的タスクを作成する他の方法はありますか?コマンドラインを使用して特定のタスクインスタンスを一時停止するには?動的に作成されたタスク/ダッグがApacheのエアフローで動作しない
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator,DummyOperator,BranchPythonOperator,SqlSensor
from airflow.models import DAG
from datetime import datetime, timedelta
import time
from pprint import pprint
seven_days_ago = datetime.combine(
datetime.today() - timedelta(7), datetime.min.time())
args = {
'owner': 'varakumar',
'start_date': seven_days_ago,
}
dag = DAG(
dag_id='dynamic_task_creation', default_args=args,
schedule_interval="@daily")
def get_decision():
return "right"
start = DummyOperator(
task_id='start',
dag=dag)
td=datetime.today()
x=str(datetime(td.year,td.month,td.day,td.hour,td.minute,td.second)).replace (" ", "_").replace (":", "-")
pause_task_id = ("pause-%s" % x)
pause = DummyOperator(
task_id=pause_task_id,
dag=dag)
pause.set_upstream(start)
decision = BranchPythonOperator(
task_id='decision',
python_callable=lambda: get_decision(),
dag=dag)
decision.set_upstream(pause)
left = DummyOperator(
task_id='left',
dag=dag)
left.set_upstream(decision)
right = DummyOperator(
task_id='right',
dag=dag)
right.set_upstream(decision)
私が見る最初の問題は、あなたがダイナミックstart_date
を使用していることである事前
これはどのように動作すると思いますか、実際にどのように動作していますか? – Mercury