のビートの導入で、それ以来変わっていた私は物事を期待していた2012年からだ4.0.1+そして、Python 2.7、およびRedisの
from celery import Celery
import os, logging
logger = logging.getLogger(__name__)
current_module = __import__(__name__)
CELERY_CONFIG = {
'CELERY_BROKER_URL':
'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')),
'CELERY_TASK_SERIALIZER': 'json',
}
celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL'])
celery.conf.update(CELERY_CONFIG)
は、私は次のようにジョブを定義:
job = {
'task': 'my_function', # Name of a predefined function
'schedule': {'minute': 0, 'hour': 0} # crontab schedule
'args': [2, 3],
'kwargs': {}
}
私は、このようなデコレータを定義します。
def add_to_module(f):
setattr(current_module, 'tasks_{}__'.format(f.name), f)
return f
私の仕事は、あなたがURLにadd_task機能をリンクし、それらを得ることができ
@add_to_module
def my_function(x, y, **kwargs):
return x + y
はその後、この後フライ
def add_task(job):
logger.info("Adding periodic job: %s", job)
if not isinstance(job, dict) and 'task' in jobs:
logger.error("Job {} is ill-formed".format(job))
return False
celery.add_periodic_task(
crontab(**job.get('schedule', {'minute': 0, 'hour': 0})),
get_from_module(job['task']).s(
enterprise_id,
*job.get('args', []),
**job.get('kwargs', {})
),
name = job.get('name'),
expires = job.get('expires')
)
return True
def get_from_module(f):
return getattr(current_module, 'tasks_{}__'.format(f))
にタスクを追加する機能を追加することですあなたの現在のモジュールの機能からタスクを作成する
新しいタスクの実装を既存の実行中の作業者に追加することを前提とすると、これの後ろに?ブローカーを使用して、ワーカーを手動で再起動するのではなく、ワーカーにジョブを送信する必要があります。 – Jason
アプリケーションのユーザーは、スクリプトをロードし、実行スケジュールを定義できる必要があります。このようなタスクは1回だけ実行するのではなく(IIUCで説明したシナリオ)、たとえば5分ごとに実行します。 – ggaspar
作業者を手動で再起動することは、私が実際に避けたいものです – ggaspar