2017-12-06 14 views
2

は、それが動的にすることが可能であるセロリに定期的にタスクを追加し、定期的なタスクを追加しますか?私はフラスコを使用していは動的セロリ

、ジャンゴ、と私は、ユーザーがWebインタフェースを通じて再発タスクを定義することができるようになっているアプリを構築していません。

Celery 4.1の定期タスクを使用しようとしましたが、新しいタスクを追加しようとしましたが、セロリサーバーを停止して、設定を変更してください。おそらく、(再起動する必要なしに)設定を動的に読み込む方法がありますか?私はcrontabの持っていると考えられてきました

すべての5minsセロリのサービスを再起動します。それは非常にコントラネイティブなようです。私がセロリを使いたかった理由は、crontabを使わないことでした。

誰もがこれにライトを持っていますか?

PS:私はanother similar questionの承知しているが、それはつまりV4.1これは、セロリのために働く

+0

新しいタスクの実装を既存の実行中の作業者に追加することを前提とすると、これの後ろに?ブローカーを使用して、ワーカーを手動で再起動するのではなく、ワーカーにジョブを送信する必要があります。 – Jason

+0

アプリケーションのユーザーは、スクリプトをロードし、実行スケジュールを定義できる必要があります。このようなタスクは1回だけ実行するのではなく(IIUCで説明したシナリオ)、たとえば5分ごとに実行します。 – ggaspar

+0

作業者を手動で再起動することは、私が実際に避けたいものです – ggaspar

答えて

0

のビートの導入で、それ以来変わっていた私は物事を期待していた2012年からだ4.0.1+そして、Python 2.7、およびRedisの

from celery import Celery 
import os, logging 
logger = logging.getLogger(__name__) 
current_module = __import__(__name__) 

CELERY_CONFIG = { 
    'CELERY_BROKER_URL': 
    'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')), 
    'CELERY_TASK_SERIALIZER': 'json', 
} 


celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL']) 
celery.conf.update(CELERY_CONFIG) 

は、私は次のようにジョブを定義:

job = { 
    'task': 'my_function',    # Name of a predefined function 
    'schedule': {'minute': 0, 'hour': 0} # crontab schedule 
    'args': [2, 3], 
    'kwargs': {} 
} 

私は、このようなデコレータを定義します。

def add_to_module(f): 
    setattr(current_module, 'tasks_{}__'.format(f.name), f) 
    return f 

私の仕事は、あなたがURLにadd_task機能をリンクし、それらを得ることができ

@add_to_module 
def my_function(x, y, **kwargs): 
    return x + y 

はその後、この後フライ

def add_task(job): 
    logger.info("Adding periodic job: %s", job) 
    if not isinstance(job, dict) and 'task' in jobs: 
     logger.error("Job {} is ill-formed".format(job)) 
     return False 
    celery.add_periodic_task(
     crontab(**job.get('schedule', {'minute': 0, 'hour': 0})), 
     get_from_module(job['task']).s(
      enterprise_id, 
      *job.get('args', []), 
      **job.get('kwargs', {}) 
     ), 
     name = job.get('name'), 
     expires = job.get('expires') 
    ) 
    return True 


def get_from_module(f): 
    return getattr(current_module, 'tasks_{}__'.format(f)) 

にタスクを追加する機能を追加することですあなたの現在のモジュールの機能からタスクを作成する