2016-09-28 12 views
1

を返さないバックエンドよう:セロリRedisのは、常に私はセロリワーカーを実行している結果

-------------- [email protected] v3.1.23 (Cipater) 
---- **** ----- 
--- * *** * -- Linux-4.4.0-31-generic-x86_64-with-debian-stretch-sid 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   __main__:0x7fe76cd42400 
- ** ---------- .> transport: amqp:// 
- ** ---------- .> results:  redis:// 
- *** --- * --- .> concurrency: 4 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 
[tasks] 
    . tasks.mytask 

tasks.py

@celery_app.task(bind=True, ignore_result=False) 
def mytask(task): 
    r = redis.StrictRedis() 
    r.rpush('/task_finished', task.request.id) 
    return {'result': 42} 

私は次のコードを実行しようとすると、 2つ目のタスクを1つずつ実行し、最初の結果が得られたときには動作しますが、2つ目の結果を返すことはできません。

import celery.result 
import redis 

r = redis.StrictRedis() 
celery_app = Celery(name="my_long_task", backend="redis://") 

while True: 
    _, resp = r.blpop('/task_finished') 
    task_id = resp.decode('utf-8') 
    task = celery.result.AsyncResult(task_id, app=celery_app) 
    print(task) 
    print(task.result) 

が返されます:

まずループ:私はをインスタンス化した場合しかし、

[3] 8463cc46-0884-4bf7-b838-f0614f74b271 
[4] {} 

[1] 990e2d04-5664-4d7c-8a5c-e9cb4ef45e24 
[2] {'result': 42} 

セカンドループを(結果を返すことができません)それは毎回動くでしょう。
celery_appを再インスタンス化しないと何が問題になりますか?私は何が欠けていますか?

編集:

(レイテンシの場合)結果のビットを待っているが、あまりにも

while True: 
    _, resp = r.blpop('/task_finished') 
    task_id = resp.decode('utf-8') 
    for i in range(0, 20): 
     # Won't work because I need to re instantiate celery_app 
     task = celery.result.AsyncResult(task_id, app=celery_app) 
     print(task.result) 
     time.sleep(1) 

答えて

0

あなたは競合状態を持って動作しません。これは何が起こるかです:

  1. ループが_, resp = r.blpop('/task_finished')とブロックに到着します。

  2. タスクは、r.rpush('/task_finished', task.request.id)

  3. ループがブロック解除実行task = celery.result.AsyncResult(task_id, app=celery_app)を実行し、タスクがまだデータベースにその結果を記録していないため、空の結果を得ます。

方法があるかもしれませんセロリは、バックエンドに結果を犯した後r.rpushを行います。おそらく、Taskから派生したカスタムクラスを作成することになります。しかし、それは私が試したことではありません。

ただし、結果コードを一緒にのタスクIDとともに保存するようにコードを変更することはできます。次のようなものがあります。

説明のためにJSONシリアライズを使用しました。あなたはどんなスキームを使ってもかまいません。読み:これにより

_, resp = r.blpop('/task_finished') 
resp = json.loads(resp) 

、あなたはignore_result=Trueignore_result=Falseを変更する場合があります。

+0

'celery_app'を再インスタンス化しないと、' task = celery.result.AsyncResult(task_id、app = celery_app) 'の回りに' forループ 'と' sleep'があっても動作しません10秒間待つことを試みた)。 結果を自分で管理する方が簡単だと私は同意しますが、私はこの問題の背後にあるメカニズムを理解しようとします – Orelus

関連する問題