2016-05-13 5 views
3

experimentという名前のRabbitMQトピック交換があります。ルーティングキーが「foo」で始まるすべてのメッセージと、ルーティングキーが「bar」で始まるすべてのメッセージを受信するコンシューマーを構築しています。Kombu ConsumerMixinを使用して、複数のバインディングを宣言する方法は?

RabbitMQドキュメントによると、管理UIで私自身の実験に基づいて、1つの交換、1つのキュー、2つのバインディング(foo.#およびbar.#)をそれらを接続することが可能でなければなりません。

私はKombuのConsumerMixinを使ってこれを表現する方法を理解できません。

q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#']) 

...しかし、それはまったく好きではありません。

q.bind_to(exchange=exchange, routing_key='foo.#') 
q.bind_to(exchange=exchange, routing_key='bar.#') 

を...しかし、私がしようとするたびに私が手::私も試してみた私は鬣の意味を推測

kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel 

を...。しかし、ミックスインのインターフェイスでは、チャンネルにバインドされたキューに簡単にフックできる場所が見つかりません。

from kombu import Connection, Exchange, Queue 
from kombu.mixins import ConsumerMixin 


class Worker(ConsumerMixin): 
    exchange = Exchange('experiment', type='topic') 
    q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True) 

    def __init__(self, connection): 
     self.connection = connection 

    def get_consumers(self, Consumer, channel): 
     return [Consumer(queues=[self.q], callbacks=[self.on_task])] 

    def on_task(self, body, message): 
     print body 
     message.ack() 


if __name__ == '__main__': 
    with Connection('amqp://guest:[email protected]:5672//') as conn: 
     worker = Worker(conn) 
     worker.run() 

...動作しますが、私だけfooメッセージを与える:ここではベース(作業)コードです。興味のあるルーティングキーごとに新しいキューを作成し、すべてをコンシューマに渡す以外に、これを行うためのきれいな方法はありますか?

答えて

5

少し掘り下げた後、これを達成する方法を見つけました。これは、私が持っていた最初のアイデアにかなり近いです。キューにrouting_key文字列を渡す代わりに、bindingsリストを渡します。リストの各要素は、交換とルーティングキーを指定するbindingオブジェクトのインスタンスです。

例は千個の言葉の価値がある:

from kombu import Exchange, Queue, binding 

exchange = Exchange('experiment', type='topic') 
q = Queue(exchange=exchange, bindings=[ 
    binding(exchange, routing_key='foo.#'), 
    binding(exchange, routing_key='bar.#') 
], exclusive=True) 

そして、それは素晴らしい作品!

+0

'binding'パラメータとして提供している' Queue'に 'exchange'パラメータを指定する必要はありませんか? – naoko

関連する問題