2013-02-19 9 views
9

ときに私のルートそれが動作する特定のキューにタスク:セロリの特定のキューに一連のタスクをルーティングする方法は?

task.apply_async(queue='beetroot') 

しかし、私はチェーンを作成する場合:

chain = task | task 

そしてその後、私は

chain.apply_async(queue='beetroot') 

無視しているようだ書きますqueueキーワードを指定し、デフォルトの 'celery'キューに割り当てます。

セロリがチェーン内でルーティングをサポートしていれば、すべてのタスクが同じキュー内で順番に実行されるといいでしょう。

答えて

10

私はこれを理解しました。

サブタスクの定義:

from celery import subtask 

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot') 

部分:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot') 

あなたはサブタスクの定義に、または部分的に通過=キュー=やカウントダウンなどの必要な実行オプションを追加する必要があります

次に、チェーンを実行します。

chain.apply_async() 

または、

chain.delay() 

とタスクは、 'ビートルート' キューに送信されます。この最後のコマンドの余分な実行引数は何もしません。 Chain(またはGroup、または他のCanvasプリミティブ)レベルでこれらの実行引数をすべて適用するのは素晴らしいことでした。

+2

うーん、部分的な例では、私のために動作しなかったことを、私は次のエラーを返しました:TypeError例外:サポートされていないオペランドのタイプ(複数可)について|「AsyncResult」と 'AsyncResult'(3.0を使用します。23) – Clara

+0

2番目のタスクを実行するために 'チェーン 'を取得しようとする際に、私自身の問題がありました。質問:両方のタスクで 'apply_async'を呼び出す場合、それは本当にチェーンですか?両方のタスクが独自に協調して実行されるのではないでしょうか? 私の場合、最初のサブタスクが2番目のサブタスクによって使用される値を返すため、構文を試してみましたが失敗しました。 – PritishC

12

私はこのようにそれを行う:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue) 
mychain = celery.chain(subtask, subtask2, ...) 
mychain.apply_async() 
+0

それは 'queue'が署名に指定されていて、' apply_async'に渡されたときは有効ではありませんか?この機能のための良い文書があるかどうか知っていますか? – dashesy

+0

同じチェーン内の異なるサブタスクに異なるキューを割り当てることはできますか? – ForeverWintr

3

これはかなり遅れているが、私は@mpafが提供するコードが完全に正しいとは思いません。

コンテキスト:私の場合、2つのサブタスクがありますが、そのうちの最初のサブタスクは入力引数として2番目のサブセットに渡されます。 2番目のタスクを実行する際に問題が発生しました。セロリがセカンドタスクを最初のコールバックとして認識しても、2番目のタスクは決して実行しないことがログでわかりました。

これは私の非稼働チェーンコードだった - :@のmpafの答えで提供構文を使用して

from celery import chain 

chain(
    module.task1.s(arg), 
    module.task2.s() 
).apply_async(countdown=0.1, queue='queuename') 

、私は実行するために、両方のタスクを得たが、実行順序がでたらめだったと第二サブタスクが確認されませんでした最初のコールバックとして。サブタスクに明示的にキューを設定する方法について、ドキュメントを参照するアイデアがあります。

これは、作業コードです - :

chain(
    module.task1.s(arg).set(queue='queuename'), 
    module.task2.s().set(queue='queuename') 
).apply_async(countdown=0.1) 
+0

私のために働いた仲間 – phacic

関連する問題