2017-07-16 8 views
0

私は2つのキュー、例えばq1とq2を持っています。これはバインディングキーb1とb2でe1とe2の交換に相当します。 q1とq2をそれぞれ聞くc1とc2と言って、コンシューマ関数を並列に実行したい。複数の待ち行列のためのrabbitmqの複数のコンシューマ

def c1(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e1', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q1') 
    queue_name = result.method.queue 
    binding_key = "b1" 
    channel.queue_bind(exchange='e1', 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

def c2(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e2', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q2') 
    queue_name = result.method.queue 
    binding_key = "b2" 
    channel.queue_bind(exchange=e1, 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

if __name__ == '__main__': 
    c1() 
    c2() 

しかし、それはc1関数とc2関数を聴いているだけで、実行されていません。どうすれば両方の機能を実行できますか? ありがとうございます。

EDIT:私はいくつかのマルチスレッド方式は順序である必要が同時に両方の機能を実行するために2つの異なるモジュール内のメソッドC1およびC1(ファイル)

+0

あなたは、Pythonスレッドモジュールを使用するか、ブロック接続の代わりに使用する必要があります。 – alphiii

答えて

1

を有します。いくつかのPythonの例では、hereを見てください。

コードはProcessクラスで変更されています。スレッドを使用することも、OSから明示的に実行することもできます。

import pika 
from multiprocessing import Process 


def callback(): 
    print 'callback got data' 


class c1(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e1', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q1') 
     queue_name = result.method.queue 
     binding_key = "b1" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 
     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 


class c2(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e2', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q2') 
     queue_name = result.method.queue 
     binding_key = "b2" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 

     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 

if __name__ == '__main__': 
    subscriber_list = [] 
    subscriber_list.append(c1()) 
    subscriber_list.append(c2()) 

    # execute 
    process_list = [] 
    for sub in subscriber_list: 
     process = Process(target=sub.run) 
     process.start() 
     process_list.append(process) 

    # wait for all process to finish 
    for process in process_list: 
     process.join() 
関連する問題