2016-11-09 9 views
0

達成しようとしているもの データベースを使用して異なるタイミングで同様のタスクをスケジュールするスケジューラを作成します。私はセロリビートを使用しています同じのためにセロリビートを使用して複数のタイミングでタスクをスケジューリングしますが、タスクは1回だけ実行します(ランダムパラメータを使用)

、以下のコードスニペットは、アイデアに

try: 
    reader = MongoReader() 
except: 
    raise 
try: 
    tasks = reader.get_scheduled_tasks() 
except: 
    raise 
celerybeat_schedule = dict() 
for task in tasks: 
    celerybeat_schedule[task["task_id"]] =dict() 
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"] 
    celerybeat_schedule[task["task_id"]]["args"] = (task,) 
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task) 

app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule) 

を与えるので、これらの3つのステップ です - データストア からすべてのタスクを読んで - 辞書を作成し、あるセロリスケジューラ

セロリ構成でこれを更新するすべてのENTR所与期待シナリオ - 特性、TASK_NAME(実行されます方法)、パラメータ(データはメソッドに渡す)、スケジュール(格納時に実行する)を有するすべてのタスク によって移入IEはただのプリントは、同じスケジュールを印刷するかを指定するさまざまなパラメータを有する、5分ごとに実行する必要が同じセロリのタスク名を実行し、DBは

task name  , parameter , schedule 
regular_print , Hi  , {"minutes" : 5} 
regular_print , Hello  , {"minutes" : 5} 
regular_print , Bye  , {"minutes" : 5} 

は私が期待して、これらは5分ごとに印刷する必要があると言うことができます、

が助けてください(確実ではない順序で、ランダムに可能) 事前にどうもありがとう:)

答えて

0

Wを3つのすべてのこんにちは、こんにちはの つだけ何が起こる

、さようならプリントを印刷しますセロリのバージョン4を使用してこれを解決できるようになりました。私のために働いたものに似たサンプル..バージョン4のセロリの文書でも見つけることができます

#taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"] 
    ex_port_queue = os.environ["EX_PORT_QUEUE"] 
    ex_user_queue = os.environ["EX_USERID_QUEUE"] 
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"] 
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//" 

    #celery initialization 
    app = Celery(__name__,backend=broker, broker=broker) 
    app.conf.task_default_queue = 'scheduler_queue' 
    app.conf.update(
     task_serializer='json', 
     accept_content=['json'], # Ignore other content 
     result_serializer='json' 
    ) 
task = {"task_id":1,"a":10,"b":20} 
##method to update scheduler 
def add_scheduled_task(task): 
    print("scheduling task") 
    del task["_id"] 
    print("adding task_id") 
    name = task["task_name"] 
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])  

@app.task(name='scheduler_task') 
def scheduler_task(data): 
    print(str(data["a"]+data["b"]))