セロリはmultiprocessing.JoinableQueue
(またはgevent.queue.JoinableQueue
)に相当しますか?JoinableQueueのセロリ等価
私が探している機能は、キュー内のすべてのタスクが完了するのを待って、パブリッシャからセロリタスクキュー.join()
にアクセスできることです。
待ち行列がワーカー自身によって動的に満杯になるので、最初の待ちであるAsyncResult
またはGroupResult
は、十分ではありません。
セロリはmultiprocessing.JoinableQueue
(またはgevent.queue.JoinableQueue
)に相当しますか?JoinableQueueのセロリ等価
私が探している機能は、キュー内のすべてのタスクが完了するのを待って、パブリッシャからセロリタスクキュー.join()
にアクセスできることです。
待ち行列がワーカー自身によって動的に満杯になるので、最初の待ちであるAsyncResult
またはGroupResult
は、十分ではありません。
完璧ではないかもしれませんが、これは私が最終的に思いついたものです。
共有Redisカウンタとリストリスナーに基づいて、基本的には既存のCeleryキューの上にJoinableQueue
ラッパーです。キュー名はルーティングキーと同じにする必要があります(before_task_publish
とtask_postrun
信号の内部実装の詳細が原因です)。
joinableceleryqueue.py:
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
def __init__(self, queue):
self.queue = queue
self.register_queue_hooks()
def begin(self):
memdb.set(self.count_prop, 0)
@property
def count_prop(self):
return "jqueue:%s:count" % self.queue
@property
def finished_prop(self):
return "jqueue:%s:finished" % self.queue
def task_add(self, routing_key, **kw):
if routing_key != self.queue:
return
memdb.incr(self.count_prop)
def task_done(self, task, **kw):
if task.queue != self.queue:
return
memdb.decr(self.count_prop)
if memdb.get(self.count_prop) == "0":
memdb.rpush(self.finished_prop, 1)
def register_queue_hooks(self):
before_task_publish.connect(self.task_add)
task_postrun.connect(self.task_done)
def join(self):
memdb.brpop(self.finished_prop)
私は私が唯一のイベント(出版社)、「すべてのタスクが終了した」を聴いてのリスナーを必要とするよう代わりにパブリッシュ/サブスクライブのBRPOP
を使用することを選択しました。
JoinableCeleryQueue
を使用することは非常に簡単です - begin()
キューにタスクを追加する前に、通常のCelery API .join()
を使用してタスクを追加して、すべてのタスクが完了するまで待ちます。
あなたは 'group(* tasks)> apply> join'で十分ではないことをplsで明確にすることはできますか? 労働者はあなたのグループの結果にどのように影響しますか?なぜあなたは '.join'結果を扱うことができませんか? – Slam
@Slamこれは、タスクの最初のグループが完了するのを待つためです。ワーカーはグループ結果に影響を与えず、キューにさらにタスクを追加します。私の意図は、待ち行列内のすべてのタスクが完了するのを待つことです(待ち行列内のすべてのタスクで 'task_done()'が呼び出されるのを待つ 'JoinableQueue'に' join() 'できる方法に似ています) )。 Redisで追加の共有カウンタとpub/subを作成することでこれを達成できましたが、Celeryだけでよりクリーンで信頼性の高い方法があるのか疑問に思っていました。 –