私はtask_postrunシグナルのraisin SystemExit()によってメインセロリプロセスをシャットダウンしようとしています。信号はちょうどうまく発射され、例外は発生しますが、ワーカーは決して完全に終了せず、ただそこにハングします。セロリはtask_postrunシグナルでSystemExitを発生させてシャットダウンしようとしていますが、常にハングアップしてメインプロセスは終了しません
この作業を行うにはどうすればよいですか?
どこかの設定が忘れていますか?
以下は、私が労働者のために使っているコード(worker.py)です:
from celery import Celery
from celery import signals
app = Celery('tasks',
set_as_current = True,
broker='amqp://[email protected]//',
backend="mongodb://localhost//",
)
app.config_from_object({
"CELERYD_MAX_TASKS_PER_CHILD": 1,
"CELERYD_POOL": "solo",
"CELERY_SEND_EVENTS": True,
"CELERYD_CONCURRENCY": 1,
"CELERYD_PREFETCH_MULTIPLIER": 1,
})
def shutdown_worker(**kwargs):
print("SHUTTING DOWN WORKER (RAISING SystemExit)")
raise SystemExit()
import tasks
signals.task_postrun.connect(shutdown_worker)
print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")
以下tasks.pyのためのコードです:
from celery import Celery,task
from celery.signals import task_postrun
import time
from celery.task import Task
class Blah(Task):
track_started = True
acks_late = False
def run(config, kwargs):
time.sleep(5)
return "SUCCESS FROM BLAH"
def get_app():
celery = Celery('tasks',
broker='amqp://[email protected]//',
backend="mongodb://localhost//"
)
return celery
はこれをテストするには、
>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))
出力:私が最初にすることは、労働者のコード(python worker.py
)を実行され、その後、私はそうのようなタスクをキュー私は労働者から見るこれです:
STARTING WORKER
-------------- [email protected] v3.0.9 (Chiastic Slide)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://[email protected]:5672//
- ** ---------- . app: tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events: ON
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-11-06 15:59:16,761: WARNING/MainProcess] [email protected] has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)
私は、Pythonのコードが完全に終了するプロセスのために、その後app.worker_main()
への呼び出しから復帰した後、WORKER EXITED
を印刷すると期待していました。 。
私が作るにはどうすればよい:これが起こることはありませんし、ワーカープロセスは、それが離れて行くのkill -KILL {PID}
EDなければならない(それがいずれかの他のタスクを消費することはできません
私は私の主な質問は以下のようになり推測しますapp.worker_main()
からコードRETURN?
私は、タスクのX
数が実行された後(プロセスCOMPLETELYの出口を持つことで)完全にワーカープロセスを再起動することができるようにしたいと思います。
UPDATE私は、労働者がぶら下がっているものを考え出した - それはSystemExit
例外をキャッチした後、作業員(WorkController
は)self.stop
への呼び出しにぶら下がっています。
現在のバージョンのCeleryでは、オプションがCELERY_DISABLE_RATE_LIMITSに変更されました。デフォルト値はCELERY_DEFAULT_RATE_LIMITで定義されています。変更しない場合は制限はありません。 –