2016-07-05 10 views
0

私はセロリのタスクをユニットテストしています。 チェーンタスクもグループを持っているので、コードが生成されます。Pythonセロリ - コード内のすべてのサブタスクを待つ方法

テストは次のようになります。

  • 実行セロリタスク(遅延)
  • をタスクとすべてのサブタスク
  • アサート

待ち、私は次のことを試してみました:

def wait_for_result(result): 
    result.get() 
    for child in result.children or list(): 
     if isinstance(child, GroupResult): 
      # tried looping over task result in group 
      # until tasks are ready, but without success 
      pass 
     wait_for_result(child) 

これにより、deアドロック、chord_unlockは永遠に再試行されます。 私は仕事の結果に興味がありません。 すべてのサブタスクが終了するのを待つ方法はありますか?

答えて

0

これは古い質問ですが、私はちょうど私はそれが誰かを助け念のために、デッドロックの問題を処分した方法を共有したいと思いました。

セロリのログのように、get()をタスク内で使用しないでください。これは確かにデッドロックを引き起こします。

私はグループタスクのチェーンを含むセロリタスクの同様のセットを持っているので、それを和音にしています。私は、HTTPリクエストを行うことによって、これらのタスクを竜巻を使用して呼び出しています。私がやったことは、このようなものだったので:celeryTask()は竜巻によって呼び出されている場合は

@task 
def someFunction(): 
    .... 


@task 
def someTask(): 
    .... 


@task 
def celeryTask(): 
    groupTask = group([someFunction.s(i) for i in range(10)]) 

    job = (groupTask| someTask.s()) 

    return job 

、チェーンが実行を開始します、& someTask()のUUIDがjobで開催されます。それは次のようになります

AsyncResult:765b29a8-7873-4b28-B05C-7e19c33e950c

このUUIDが返され、celeryTask()終了してもチェーンが故にためのスペースを残して、(理想的には)実行を開始する前に実行する別のプロセス。

その後、トルネード層を使用してタスクのステータスを確認しました。竜巻層の詳細はこちらにありますstackoverflow question

関連する問題