2016-12-12 11 views
5

別のタスクグループの終わりにCeleryを使って周期的なタスクを動的にスケジュールします。Celery - 別のタスクの最後に定期的なタスクをスケジュールします。

私はセロリと(静的)定期的なタスクを作成する方法を知っている:

CELERYBEAT_SCHEDULE = { 
     'poll_actions': { 
      'task': 'tasks.poll_actions', 
      'schedule': timedelta(seconds=5) 
     } 
} 

をしかし、私は自分のタスクから動的に定期的なジョブを作成したい(そしておそらくいくつかの条件があるときに、それらの定期的なジョブを停止する方法を持っています達成(すべてのタスクが実行される)のような

何か:。

@celery.task 
def run(ids): 
    group(prepare.s(id) for id in ids) | execute.s(ids) | poll.s(ids, schedule=timedelta(seconds=5)) 

@celery.task 
def prepare(id): 
    ... 

@celery.task 
def execute(id): 
    ... 

@celery.task 
def poll(ids): 
    # This task has to be schedulable on demand 
    ... 

答えて

3

これに対する簡単な解決策は、削除/ビートスケジューラENTRを追加できるようにする必要があり飛んでいる。

How to dynamically add/remove periodic tasks to Celery (celerybeat)

...この質問の解答のように、これは不可能でした。私はそれが暫定的に利用可能になったことを疑う...

あなたはここで2つのコンセプトを融合させています。 「イベントドリブンワーク」の概念と「バッチスケジュール駆動ワーク」の考え方()は、イベントがスケジュールで発生する最初のケースです)。ここで何をしているのか本当に考えてみると、かなり複雑なエッジケースがあることがわかります。メッセージは自然界に分散しています。異なるメッセージから生まれたグループが競合するエントリの作成を開始するとどうなりますか?以前予定されていたkruftの山の上にいるとき、あなたは何をしていますか?

メッセージングシステムを使用している場合、実際には再帰的なツリーを構築しようとしています。何かをし、より多くのことをするためにもっと多くのメッセージを生み出す仕事のスピンドル。これらを除いて(意図されているかどうかにかかわらず)サイクルは最終的に基本ケースを達成して終了します。

実際に達成しようとしていることへの答えは、メッセージングシステムと非同期作業フレームワークの制限内で問題を再エンコードすることにあります。

関連する問題