2017-09-04 12 views
1

期待通りに動作していないと私は、しかし、それが期待どおりに動作しない特定のキューに自分のタスクを割り当てるセロリタスクルートは、私はセロリを練習しています

マイ__init__.py

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

マイcelery_config.py

amqp = 'amqp://guest:[email protected]:5672//' 
broker_url = amqp 
result_backend = amqp 

task_routes = ([ 
    ('import_feed', {'queue': 'queue_import_feed'}) 
]) 

マイtasks.py

from . import app 

@app.task(name='import_feed') 
def import_feed(): 
    pass 

私は、ワーカーの実行方法:

celery -A subscriber1.tasks worker -l info 

私のクライアントの__init__.py

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

私のクライアントのcelery_config.py

from kombu.common import Broadcast 

amqp = 'amqp://guest:[email protected]:5672//' 
BROKER_URL = amqp 
CELERY_RESULT_BACKEND = amqp 

はその後、私のクライアントのシェルに私が試した:

from publisher import app 
result = app.send_task('import_feed') 

その後、私の労働者が仕事を得ましたか私はそれを特定のキューに割り当てたので、私はそれをしてはいけません。私は、ルーティングの部分で何かを誤解のように、私は最初の1

result = app.send_task('import_feed', queue='queue_import_feed') 

に代わりに受け取っていることを期待し、以下のコマンドと全くタスクが私の労働者によって受信されていないが思え、私のクライアントで試してみました。しかし、私が本当に望んでいるのは、import_feedタスクは、タスクを送信するときにqueue_import_feedキューが指定されている場合にのみ実行されます。

答えて

1

ワーカーが処理するデフォルトのキューを変更できます。

app.send_task('import_feed')は、タスクをceleryキューに送信します。

app.send_task('import_feed', queue='queue_import_feed')queue_import_feedにタスクを送信しますが、作業者はceleryキュー内のタスクのみを処理しています。

は、労働者は、それがキューに発行されています場合にのみ、 import_feedタスクに反応するよう send_task上の制約を配置するために使用し、 -Qスイッチ

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed' 

編集

を特定のキューを処理するにはsend_taskCeleryに上書きする必要があります。AMQPdefault_queueNoneに設定してカスタムを提供する必要があります。

リアクター。PY

from celery.app.amqp import AMQP 
from celery import Celery 

class MyCelery(Celery): 
    def send_task(self, name=None, args=None, kwargs=None, **options): 
     if 'queue' in options: 
      return super(MyCelery, self).send_task(name, args, kwargs, **options) 


class MyAMQP(AMQP): 
    default_queue = None 

celery_config.py

from kombu import Exchange, Queue 

... 

task_exchange = Exchange('default', type='direct') 
task_create_missing_queues = False 

task_queues = [ 
    Queue('feed_queue', task_exchange, routing_key='feeds'), 
] 

task_routes = { 
    'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'} 
} 

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP') 
+0

申し訳ありませんが、それは本当に質問に答えませんでした。私はそれがそうすることができることを知っています。私はちょうど私の最後の声明が起こるようにしたい –

+0

あなたの最後の声明は、キューが指定されている場合にのみ、特定のタスクを実行するよう求めています。 [自動ルーティング](http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-create-missing-queues)を無効にして、このようなタスクが ' celery' queue –

+0

私が望むのは、 'celery -A subscriber1.tasks worker -l info'を実行してワーカーを実行しているキューを指定しないで、' import_feed'タスクがタスクの公開時に反応するように制限します。 'queue_import_feed'キュー。それは不可能ですか? –

関連する問題