セールスワーカーにカスタムコンシューマーを追加するのは現在難しいですが、コンシューマーのブートステップのサポートが追加された開発バージョン(3.1になります)ではこれが変更されています。
あり、私はちょうどそれを実装し終えたとして何のドキュメントはまだませんが、ここでは例です:APIは、最終版では、私はまだわからない一つのことを変更することが
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body,))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
注 約get_consumer(connection)
の後のチャンネルの扱い方です 現在、コンシューマのチャネルは接続が失われたときに閉じられ、シャットダウン時には になりますが、手動でチャネルを処理したい場合があります。その場合、常にConsumerStepをカスタマイズするか、新しいStartStopStepを作成する可能性があります( )。
ドキュメントはhttp://celery.readthedocs.org/en/latest/userguide/extending.htmlにあります。 –