2017-03-04 17 views
0

私はCeleryタスクでスレッドのプールを実装しようとしています。CeleryタスクへのPythonマルチスレッド。 celery_task.update_state()エラー

My Celeryタスクはタスク状態に関する情報をDBに送るupdate_state()関数を呼び出します。そして正常に動作します。 しかし、タスクにスレッドを追加していて、各スレッドでupdate_state()関数を呼び出そうとすると、Celeryはエラーを返します。

この

は(スレッディングなし)の例を働いている:

import celery 

@celery.task(bind=True) 
def get_info(self, user): 
    for i in xrange(4): 
     self.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 

これに動作していない例を(スレッドで):

import celery 
import threading 

lock = threading.Lock() 

def run_subtask(celery_task, i): 
    lock.acquire() 
    #Error raises here, when update_state calls 
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i}) 
    lock.release() 

@celery.task(bind=True) 
def get_info(self, user): 

    for i in xrange(4): 
     worker = threading.Thread(target=run_subtask, args=(self, i)) 
     worker.start() 

エラーは次のとおりです。

[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found 

は何ですか理由?なぜ私はthreadにupdate_state()を呼び出せないのですか?

答えて

0

回答が見つかりました。

task.requestはスレッドローカルなので、タスクを実行しているスレッドだけがupdate_stateを呼び出すことができます。

これは、スレッドが結果を格納しているタスクポストハンドラと競合できると考えると、特に意味があります。

あなたがスレッドにTASK_IDを渡すことができます。

cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to}) 

しかし、あなたは、スレッドが参加しているいまいましいを確認する必要があり、タスクが終了する前に停止(thread.join())。 あなたの例では、スレッドはwhileループが終了した後にのみ参加でき、1秒間スリープしているので、参加はそれだけ遅延される可能性があります。

1

セロリはコンテキストオブジェクトの種類をスレッドに追加し、関連するタスクを認識します。スレッドをタスクに関連付けるには、次のようなものを実行する必要があります。

from celery.app import push_current_task 


def run_subtask(celery_task, i): 
    push_current_task(celery_task) 

    ... 

    pop_current_task() 
+0

ありがとうございました!私はすでに解決策を見つけてここに掲示しました。私はあなたのソリューションを検証しませんでしたが、私はそれが同じ方向性を持っていると思います。 – Denti

+0

マインはドキュメントが推奨する方法です。私は私の使用をお勧めします。 –

関連する問題