私はCeleryタスクでスレッドのプールを実装しようとしています。CeleryタスクへのPythonマルチスレッド。 celery_task.update_state()エラー
My Celeryタスクはタスク状態に関する情報をDBに送るupdate_state()関数を呼び出します。そして正常に動作します。 しかし、タスクにスレッドを追加していて、各スレッドでupdate_state()関数を呼び出そうとすると、Celeryはエラーを返します。
この
は(スレッディングなし)の例を働いている:import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
これに動作していない例を(スレッドで):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
エラーは次のとおりです。
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
は何ですか理由?なぜ私はthreadにupdate_state()を呼び出せないのですか?
ありがとうございました!私はすでに解決策を見つけてここに掲示しました。私はあなたのソリューションを検証しませんでしたが、私はそれが同じ方向性を持っていると思います。 – Denti
マインはドキュメントが推奨する方法です。私は私の使用をお勧めします。 –