2012-10-01 6 views
9

私は、いくつかのJavaアプリケーションがさまざまなチャンネルの文字列JSONオブジェクトとしてログ出力メッセージを使用している既存のRabbitMQデプロイメントを使用しています。これらのメッセージを消費し、さまざまな場所(DB、Hadoopなど)に書き込むためにセロリを使用したいと思います。既存のRabbitMQメッセージでCeleryを使用する

Celeryは、メッセージが配信される仕組みを隠そうとするため、RabbitMQメッセージのプロデューサとコンシューマの両方になるように設計されています。セロリに別のアプリで作成されたメッセージを消費させて、到着時にジョブを実行させるにはどうしてですか?

答えて

12

セールスワーカーにカスタムコンシューマーを追加するのは現在難しいですが、コンシューマーのブートステップのサポートが追加された開発バージョン(3.1になります)ではこれが変更されています。

あり、私はちょうどそれを実装し終えたとして何のドキュメントはまだませんが、ここでは例です:APIは、最終版では、私はまだわからない一つのことを変更することが

from celery import Celery 
from celery.bin import Option 
from celery.bootsteps import ConsumerStep 
from kombu import Consumer, Exchange, Queue 

class CustomConsumer(ConsumerStep): 
    queue = Queue('custom', Exchange('custom'), routing_key='custom') 

    def __init__(self, c, enable_custom_consumer=False, **kwargs): 
     self.enable = self.enable_custom_consumer 

    def get_consumers(self, connection): 
     return [ 
      Consumer(connection.channel(), 
       queues=[self.queue], 
       callbacks=[self.on_message]), 
     ] 

    def on_message(self, body, message): 
     print('GOT MESSAGE: %r' % (body,)) 
     message.ack() 


celery = Celery(broker='amqp://localhost//') 
celery.steps['consumer'].add(CustomConsumer) 
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true', 
      help='Enable our custom consumer.'), 
) 

注 約get_consumer(connection)の後のチャンネルの扱い方です 現在、コンシューマのチャネルは接続が失われたときに閉じられ、シャットダウン時には になりますが、手動でチャネルを処理したい場合があります。その場合、常にConsumerStepをカスタマイズするか、新しいStartStopStepを作成する可能性があります( )。

+3

ドキュメントはhttp://celery.readthedocs.org/en/latest/userguide/extending.htmlにあります。 –

関連する問題