2017-07-04 26 views
0

私はこの仕事で一日中戦ってきました。実行時にセロリの定期的なタスクを追加して削除する

私はDjangoアプリを持っています。私は非同期タスクのためにCeleryを使用します。時々、私は定期的な仕事を作りたいと思う。タスクが不明に実行される回数。後で削除する必要があります。したがって、タスクは次のようになります。

@shared_task 
def foobar_task(id): 
    if this_should_run: 
     do_task() 
    else: 
     PeriodicTask.objects.get(name='{} task'.format(id)).delete() 

私のアプリは動作しています。私はセクシーなビートをDockerコンテナで実行し、celery --app=myproject beat --loglevel=info --scheduler=djangoを使用して実行します。私は標準のセロリの労働者を実行している別のコンテナを持っています。

これで、定期的なタスクを動的に作成したいと思います。私は定期的なタスクが作成された見ることができ、Djangoのadminに

schedule, _ = IntervalSchedule.objects.get_or_create(every=15, period=IntervalSchedule.SECONDS) 
PeriodicTask.objects.create(interval=schedule, 
          name='{} task'.format(id), 
          task='myapp.tasks.foobar_task') 

:私はこのような何かをトリガービュー/ APIエンドポイントを持っています。しかし、セロリコンテナとセロリビートコンテナの両方のログを見ると、何も起こりません。

セロリがなぜ新しい定期的な仕事があるのではないのですか?新しいタスクが作成または削除されるたびにセロリを打ち返す必要はありません。

注:私はDjango 1.11.2、PostgreSQL、Celery 4.0.2、Django Celery Beat 1.0.1を使用しています。

答えて

1

this answerから順に、以下のようなカスタムスケジューラを作成できます。

あなたがセロリビートを実行すると、このクラスを指すように
from django_celery_beat.schedulers import DatabaseScheduler 

class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = [x for x in new_schedule.keys() if x not in self.schedule.keys()] 
       to_remove = [x for x in self.schedule.keys() if x not in new_schedule.keys()] 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 
     return self._schedule 

celery --app=myproject beat --loglevel=info --scheduler=myproject.scheduler.AutoUpdateScheduler