2012-05-22 7 views
6

私はDjangoとCeleryを使用していますが、ルーティングを複数のキューに設定しようとしています。タスクrouting_keyexchange(タスクデコレータまたはapply_async())を指定すると、タスクはブローカー(私のMySQLデータベースに接続しているKombu)に追加されません。Django&Celery - ルーティングの問題

タスクデコレータでキュー名を指定すると(ルーティングキーは無視されます)、タスクは正常に動作します。これはルーティング/交換の設定に問題があるようです。

どのような問題が発生する可能性がありますか?ここで

はセットアップです:

settings.py

INSTALLED_APPS = (
    ... 
    'kombu.transport.django', 
    'djcelery', 
) 
BROKER_BACKEND = 'django' 
CELERY_DEFAULT_QUEUE = 'default' 
CELERY_DEFAULT_EXCHANGE = "tasks" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "task.default" 
CELERY_QUEUES = { 
    'default': { 
     'binding_key':'task.#', 
    }, 
    'i_tasks': { 
     'binding_key':'important_task.#', 
    }, 
} 

tasks.py

from celery.task import task 

@task(routing_key='important_task.update') 
def my_important_task(): 
    try: 
     ... 
    except Exception as exc: 
     my_important_task.retry(exc=exc) 

は、タスクを開始します

from tasks import my_important_task 
my_important_task.delay() 
+0

をあなたはrouting_keyを渡すにはどうすればよいの? async_applyを使用していますか? – mher

+0

私は 'delay()'メソッドを使用しています。これは 'apply_async()'のショートカットです。私は、 'routing_key'仕様を、(デコレータを介して)それが呼び出されたときではなく、タスクメソッドで保持しようとしています。代わりに 'apply_async()'を使ってキーを渡そうとしましたが、私は同じ問題を抱えています。 –

+0

遅延はrouting_keyキーワードを受け入れません。これはapply_asyncの簡略化されたバージョンですが、同じではありません。 – mher

答えて

43

あなたはrouting_keyでこのタスクを適用するときにあなたがそう(http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparisonで、見つけるのは、inarguablyハード輸送比較表を参照)の宣言は、専用メモリ に格納されている意味ブローカーとして

をDjangoのORMを使用していますimportant_task.updateキューをまだ宣言していないため、 はルーティングできません。

あなたがこれを行う場合、それは動作します:

@task(queue="i_tasks", routing_key="important_tasks.update") 
def important_task(): 
    print("IMPORTANT") 

をしかし、あなたは自動ルーティング機能を使用することがはるかに簡単になり、 あなたは 'を使用する必要があります示し、ここでは何もありませんので、

  • :設定を削除するだけでを自動ルーティングを使用するトピックのExchange、 210、
  • CELERY_DEFAULT_EXCHANGE
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

そして、このようにあなたのタスクを宣言:そのキューからのかかる作業員を開始する

@task(queue="important") 
def important_task(): 
    return "IMPORTANT" 

、その後:

$ python manage.py celeryd -l info -Q important 

またはデフォルト(celery)キューとimportantキューの両方から消費する:

$ python manage.py celeryd -l info -Q celery,important 

もう一つの良い方法は タスクにキュー名をハードコーディングし、代わりにCELERY_ROUTESを使用しないことです:

@task 
def important_task(): 
    return "DEFAULT" 

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}} 

それでもトピック交換を使用して主張なら、あなたは 自動的にすべてのキューにタスクが送信された最初の時間 宣言するために、このルータを追加することができます。

class PredeclareRouter(object): 
    setup = False 

    def route_for_task(self, *args, **kwargs): 
     if self.setup: 
      return 
     self.setup = True 
     from celery import current_app, VERSION as celery_version 
     # will not connect anywhere when using the Django transport 
     # because declarations happen in memory. 
     with current_app.broker_connection() as conn: 
      queues = current_app.amqp.queues 
      channel = conn.default_channel 
      if celery_version >= (2, 6): 
       for queue in queues.itervalues(): 
        queue(channel).declare() 
      else: 
       from kombu.common import entry_to_queue 
       for name, opts in queues.iteritems(): 
        entry_to_queue(name, **opts)(channel).declare() 
CELERY_ROUTES = (PredeclareRouter(),) 
+0

説明をありがとう! –

+2

この問題は、Celery 3で解決されたキュー宣言と交換で問題になりますか?私は設定で新しい 'CELERY_QUEUES =(Queue(...)、...)'を使用しています、これはキューが正しく宣言されていることを意味しますか? –

+0

注:CELERY_ROUTESはCELERY_TASK_ROUTESに置き換えられました。誰かの時間を節約するかもしれない。 –