2017-05-15 15 views
1

私はCeleryを使ってDjangoアプリケーションから非同期タスクを処理しています。ほとんどのタスクは短く、数秒で実行されますが、私は数時間かかることがある1つのタスクがあります。セロリのタスクをタスク名で制限する方法は?

私のサーバーでの処理の制限により、Celeryは2つのタスクを一度に実行するように設定されています。つまり、誰かが長時間実行している2つのタスクを起動すると、数時間にわたり他のすべてのセロリ処理サイトを効果的にブロックすることになり、非常に悪いことです。

一度に1つのタイプのタスクしか処理しないようにセロリを構成する方法はありますか?ような何か:

@task(max_running_instances=1) 
def my_really_long_task(): 
    for i in range(1000000000): 
     time.sleep(6000) 

注、私はmy_really_long_taskの他のすべての起動を中止する必要はありません。私はすぐにそれらを始めることを望んでおらず、同じ名前の他のすべての仕事が終わると始めるだけです。

これはCeleryによってサポートされていないようだから、私の現在のハック解決策は、タスク内の他のタスクを照会して、他の実行中のインスタンスを見つけたら、後で実行するようにスケジュールを変更します。

from celery.task.control import inspect 

def get_all_active_celery_task_names(ignore_id=None): 
    """ 
    Returns Celery task names for all running tasks. 
    """ 
    i = inspect() 
    task_names = defaultdict(int) # {name: count} 
    if i: 
     active = i.active() 
     if active is not None: 
      for worker_name, tasks in i.active().iteritems(): 
       for task in tasks: 
        if ignore_id and task['id'] == ignore_id: 
         continue 
        task_names[task['name']] += 1 
    return task_names 

@task 
def my_really_long_task(): 

    all_names = get_all_active_celery_task_names() 
    if 'my_really_long_task' in all_names: 
     my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300)) 
     return 

    for i in range(1000000000): 
     time.sleep(6000) 

これを行うより良い方法はありますか?

私はthisのような他のハックの解決策を知っていますが、タスクの一意性を追跡するために別のmemcacheサーバーを設定することは、上記の方法よりも信頼性が低く、複雑です。

+0

私はTaskLockというDjangoテーブルを追加し、タスクロックをチェック/呼び出し/解放するために 'with tasklock(self)'式を使用しました。私はまた、よりハッキリではないものを見たいと思っています... – rrauenza

+0

ロックに有効期限がある場合は、あなたのタスクでtime_limitも使用するようにしてください... – rrauenza

答えて

1

もう1つの解決方法は、my_really_long_taskを別のキューに入れることです。

my_really_long_task.apply_async(*args, queue='foo') 

次に、並行性が1のワーカーを開始してこれらのタスクを消費し、一度に1つのタスクしか実行されないようにします。

celery -A foo worker -l info -Q foo 
+0

これは少しdevopsのオーバーヘッドが必要ですが、クリーンなソリューションのようです。ありがとう。 – Cerin

関連する問題