2016-09-08 13 views
7

セロリはmultiprocessing.JoinableQueue(またはgevent.queue.JoinableQueue)に相当しますか?JoinableQueueのセロリ等価

私が探している機能は、キュー内のすべてのタスクが完了するのを待って、パブリッシャからセロリタスクキュー.join()にアクセスできることです。

待ち行列がワーカー自身によって動的に満杯になるので、最初の待ちであるAsyncResultまたはGroupResultは、十分ではありません。

+0

あなたは 'group(* tasks)> apply> join'で十分ではないことをplsで明確にすることはできますか? 労働者はあなたのグループの結果にどのように影響しますか?なぜあなたは '.join'結果を扱うことができませんか? – Slam

+0

@Slamこれは、タスクの最初のグループが完了するのを待つためです。ワーカーはグループ結果に影響を与えず、キューにさらにタスクを追加します。私の意図は、待ち行列内のすべてのタスクが完了するのを待つことです(待ち行列内のすべてのタスクで 'task_done()'が呼び出されるのを待つ 'JoinableQueue'に' join() 'できる方法に似ています) )。 Redisで追加の共有カウンタとpub/subを作成することでこれを達成できましたが、Celeryだけでよりクリーンで信頼性の高い方法があるのか​​疑問に思っていました。 –

答えて

0

完璧ではないかもしれませんが、これは私が最終的に思いついたものです。

共有Redisカウンタとリストリスナーに基づいて、基本的には既存のCeleryキューの上にJoinableQueueラッパーです。キュー名はルーティングキーと同じにする必要があります(before_task_publishtask_postrun信号の内部実装の詳細が原因です)。

joinableceleryqueue.py

from celery.signals import before_task_publish, task_postrun 
from redis import Redis 
import settings 

memdb = Redis.from_url(settings.REDIS_URL) 

class JoinableCeleryQueue(object): 
    def __init__(self, queue): 
     self.queue = queue 
     self.register_queue_hooks() 

    def begin(self): 
     memdb.set(self.count_prop, 0) 

    @property 
    def count_prop(self): 
     return "jqueue:%s:count" % self.queue 

    @property 
    def finished_prop(self): 
     return "jqueue:%s:finished" % self.queue 

    def task_add(self, routing_key, **kw): 
     if routing_key != self.queue: 
      return 

     memdb.incr(self.count_prop) 

    def task_done(self, task, **kw): 
     if task.queue != self.queue: 
      return 

     memdb.decr(self.count_prop) 
     if memdb.get(self.count_prop) == "0": 
      memdb.rpush(self.finished_prop, 1) 

    def register_queue_hooks(self): 
     before_task_publish.connect(self.task_add) 
     task_postrun.connect(self.task_done) 

    def join(self): 
     memdb.brpop(self.finished_prop) 

私は私が唯一のイベント(出版社)、「すべてのタスクが終了した」を聴いてのリスナーを必要とするよう代わりにパブリッシュ/サブスクライブのBRPOPを使用することを選択しました。

JoinableCeleryQueueを使用することは非常に簡単です - begin()キューにタスクを追加する前に、通常のCelery API .join()を使用してタスクを追加して、すべてのタスクが完了するまで待ちます。