2017-05-17 9 views
0

rabbitpyを使用するプログラムを実行しようとすると、あるキューのターミナルからコマンドを送信し、別のキューの別のスレッドで応答を読み込みます。問題は、queue1でメッセージを送信すると、リスナー・スレッド(queue1ではなくqueue2で待機している必要があります)がqueue1からメッセージを読み込み/消費することです。以下は私のコードです。すべての大文字の定数は文字列定数だけであり、mqsetupはそれらが存在することを確認するためにキューとルーチンキーを設定します。私は間違って何をしていますか?Rabbitpyリスナー間違ったキューからの読み込み

#################################################################### 
# dummy_ctrl.py 
# This is a dummy script that acts like a controller. It lets a 
# user/tester send task requests to our task handler thread pool 
# and get results back 
#################################################################### 
import threading 
import json 
import rabbitpy 
import mqsetup 
import traceback 

from constants import (
    AMQP_URL, 
    CTRL_EXCHANGE, 
    CTRL_QUEUE, 
    CTRL_RESPONSE_QUEUE, 
    CTRL_ROUTING_KEY, 
    CTRL_RESPONSE_ROUTING_KEY 
) 

# 
# This class listens for responses from our task thread pool program 
# When it gets a response message it just prints it 
# 
class MQResponseListener(object): 
    def __init__(self,queue_name=CTRL_RESPONSE_QUEUE): 
     self.queue_name=queue_name 
     self.running=False 
     self.conn=None 
     self.channel=None 
     self.exchange=None 
     self.queue=None 

    # 
    # This is the listener function that runs in an infinite loop in a thread. 
    # It keeps going until the user stops it with a stop call 
    # 
    def run_listener(self): 
     print "Running listener" 
     with rabbitpy.Connection(AMQP_URL) as self.conn: 
      # Open the channel to communicate with RabbitMQ 
      print "Got self.conn" 
      with self.conn.channel() as self.channel: 
       print "Got self.channel" 
       self.queue = rabbitpy.Queue(self.channel, self.queue_name) 
       print "Got self.queue on queue name %s" % self.queue_name 
       print "Finished listener setup, now wait for messages" 

       for message in self.queue.consume(): 
        if message: 
         print "***********************************************************************" 
         print "Received Message!" 
         print "***********************************************************************" 
         print "Message body is %s" % message.body 
         message.ack() 
         print "Waiting for next message" 
        else: 
         print "Message is None, did we just call stop consuming?" 
         break 
       print "Finished consuming" 

     print "End of run_listener" 

    # 
    # This launches the listener as a thread and then returns 
    # The thread will run forever (persistent listener) 
    # 
    def start(self): 
     self.thread=threading.Thread(target=self.run_listener) 
     self.running=True 
     print "Starting listener" 
     self.thread.start() 

    def stop(self): 
     print "stopping consuming" 
     self.running=False 
     self.queue.stop_consuming() 
     print "end of listener.stop()" 

    def join(self): 
     self.thread.join() 

def publish_message(channel,body_value): 
    # 
    # Create the message to publish 
    # 
    message = rabbitpy.Message(channel, body_value) 

    # 
    # Publish the message, looking for the return value to be a bool True/False 
    # 
    if message.publish(CTRL_EXCHANGE, routing_key=CTRL_ROUTING_KEY, mandatory=True): 
     print 'Message publish confirmed by RabbitMQ' 
    else: 
     print 'RabbitMQ indicates message publishing failure' 

if __name__ == '__main__': 

    mqsetup.setup() 

    listener=MQResponseListener() 
    listener.start() 

    with rabbitpy.Connection(AMQP_URL) as conn: 
     # Open the channel to communicate with RabbitMQ 
     with conn.channel() as channel: 

      exchange = rabbitpy.Exchange(channel, CTRL_EXCHANGE, exchange_type='direct') 
      exchange.declare() 

      # Turn on publisher confirmations 
      channel.enable_publisher_confirms() 

      queue = rabbitpy.Queue(channel, CTRL_QUEUE) 
      queue.durable = True 
      queue.declare() 

      queue.bind(exchange, routing_key=CTRL_ROUTING_KEY) 

      #queue = rabbitpy.Queue(channel, CTRL_QUEUE) 

      while True: 
       print "Options:" 
       print "1. Call function some_dummy_command" 
       print "2. Quit" 
       try: 
        opt=int(raw_input('Select option number:')) 
        if opt==1: 
         # 
         # create message body in cmdstr 
         # 
         print "Sending a message" 
         cmdDict={ 
          'ids': ['id1', 'id2', 'id3'], 
          'name': "some_dummy_command", 
          'args': [], 
          'kwargs': {} 
         } 
         cmdstr=json.dumps(cmdDict) 
         # 
         # publish message on channel 
         # 
         publish_message(channel,cmdstr) 
        elif opt==2: 
         print "Stopping the listener" 
         listener.stop() 
         print "Breaking" 
         break 
        else: 
         print "opt unknown: %s" % opt 

       except rabbitpy.exceptions.NotConsumingError: 
        break 
       except ValueError: 
        print "Not a number" 

    print "Waiting for listener thread to complete" 
    listener.join() 
    print "Done!" 

#################################################################################### 
# mqsetup.py 
# This module makes sure the exchanges, queues, etc all get set up in RabbitMQ. 
# This is crucial if not already set up, has no effect if they are already set up. 
#################################################################################### 
import rabbitpy 
from constants import (
    AMQP_URL, 
    CTRL_EXCHANGE, 
    CTRL_QUEUE, 
    CTRL_RESPONSE_QUEUE, 
    CTRL_ROUTING_KEY, 
    CTRL_RESPONSE_ROUTING_KEY 
) 

def setup(): 
    # Connect to RabbitMQ on localhost, port 5672 as guest/guest 
    conn=rabbitpy.Connection(AMQP_URL) 
    channel=conn.channel() 

    exchange = rabbitpy.Exchange(channel, CTRL_EXCHANGE, exchange_type='direct') 
    exchange.declare() 

    queue = rabbitpy.Queue(channel, CTRL_QUEUE) 
    queue.durable = True 
    queue.declare() 
    queue.bind(exchange, routing_key=CTRL_ROUTING_KEY) 

    channel.close() 
    conn.close() 

    ######### 

    # Connect to RabbitMQ on localhost, port 5672 as guest/guest 
    conn=rabbitpy.Connection(AMQP_URL) 
    channel=conn.channel() 

    exchange = rabbitpy.Exchange(channel, CTRL_EXCHANGE, exchange_type='direct') 
    exchange.declare() 

    queue2 = rabbitpy.Queue(channel, CTRL_RESPONSE_QUEUE) 
    queue2.durable = True 
    queue2.declare() 
    queue2.bind(exchange, routing_key=CTRL_RESPONSE_ROUTING_KEY) 

    channel.close() 
    conn.close() 
+0

交換機を作成するときは、exchange_type = 'direct'を使用することに注意してください。それでも同じ問題があります。 – Marc

答えて

0

いいえ、私の悪いです。もともと私の交換はファンアウトでした。後でdirectに変更しましたが、rabbitmq-serverを再起動しませんでしたが、それでも動作しませんでした。したがって、重要な点は、交換機を設定してサーバを再起動することです。今は期待どおりに動作します。

関連する問題