2012-08-09 29 views
8

私はタスクを呼び出してそのタスクのキューを作成しようとしています。存在しなければ、直ちに呼び出されたタスクをそのキューに挿入しようとしています。私は、次のコードを持っている:セロリの動的キューの作成とルーティング

@task 
def greet(name): 
    return "Hello %s!" % name 


def run(): 
    result = greet.delay(args=['marc'], queue='greet.1', 
     routing_key='greet.1') 
    print result.ready() 

、私はカスタムルーターを持っている:

class MyRouter(object): 

    def route_for_task(self, task, args=None, kwargs=None): 
     if task == 'tasks.greet': 
      return {'queue': kwargs['queue'], 
        'exchange': 'greet', 
        'exchange_type': 'direct', 
        'routing_key': kwargs['routing_key']} 
     return None 

これは交換がgreet.1と呼ばれ、キューがgreet.1と呼ばれるが、キューが空で作成されます。この交換は、greet.1のようなルーティングキーをgreet.1というキューにルーティングする方法を知っているgreetと呼ばれるだけです。

アイデア?

答えて

13

あなたは次のようにします。

task.apply_async(queue='foo', routing_key='foobar') 

その後セロリは、CELERY_QUEUESに「foo」でキューからデフォルト値を取る か、それが存在しない場合は、自動的に(キュー= FOO、交換を使用して作成します。 = FOO、routing_key = FOO ')fooは' CELERY_QUEUESに存在しない場合

は、だから、で終わるだろう:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo') 

プロデューサーが続いていることを宣言しますあなたがrouting_keyを上書きするため、キューは、しかし、 は実際にこれは奇妙に思えるかもしれrouting_key = 'foobar'

を使用してメッセージを送信するが、動作は、別のトピックに公開する話題交換、 ために実際に有用です。

あなたが望むことを行うのは難しいですが、自分でキューを作成して と宣言できますが、自動メッセージ発行の再試行ではうまく動作しません。 apply_asyncのqueue引数が カスタムkombu.Queueをサポートし、その代わりに宣言されて宛先として使用される場合は、より良いでしょう。 http://github.com/celery/celery/issues

+0

私はキューを手動で作成するのではなく、キューを作成して自動的に交換する新しいワーカーを生成して、自分の問題をより理にかなったものにすることをやめました。いつものように、返事をありがとう。 :) – Marconi

関連する問題