2016-05-19 3 views
0

私はESB(zato)によって呼び出されるサービスを持っていますが、このサービスの役割はAMQPを介してrabbitMQにメッセージを公開することですrabbitMQの返信キューが空です。私はZATから発信AMQPを使用してメッセージを取得しようとしています

で:私は次のようになるのRabbitMQに相談し、メッセージキュー答えはあるがzatoサービスからウサギのキューから消費するzato

from zato.server.service import Service 

class HelloService(Service): 
    def handle(self): 

     # Request parameters 
     msg = 'Hello AMQP broker!' 
     out_name = 'My CRM connection' 
     exchange = 'My exchange' 
     routing_key = '' 
     properties = {'app_id': 'ESB'} 
     headers = {'X-Foo': 'bar'} 

     # Send a message to the broker 
     self.outgoing.amqp.send(msg, out_name, exchange, routing_key, 
      properties, headers) 
+0

「交換」の種類はありますか?どのようにキューを交換先にバインドしましたか? – Gabriele

+0

ウサギのデビューしたIMと交換のタイプが話題になりました、私は交換にキューをバインドする方法を知らなかった – hend

答えて

1

にサービスがempty.thisでいっぱい実施例である取得しますウサギ

  1. 公開または消費するzatoサービスを書くzato
  2. に出AMQP接続定義のそれぞれで独立し作成
  3. zato
  4. に接続定義を作成して交流にキュー
  5. バインドキューを作成します
  6. 交換を作成します。

最初の3つのステップは、ここでは、複数の方法で行うことができますが(ちょうど昆布をインストールして、クリックする)ことを行うために使用することができ、簡単なPythonスクリプトです:

import click 
import os 
import sys 
import settings 
from kombu import Connection, Exchange, Queue 


BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(user=settings.RABBIT_USER, 
                     password=settings.RABBIT_PASS, 
                     server=settings.RABBIT_SERVER, 
                     port=settings.RABBIT_PORT, 
                     vhost=settings.RABBIT_VHOST) 


@click.command() 
@click.option('--remove/--no-remove', default=False, help='Remove current Queues/Exchanges.') 
@click.option('--create/--no-create', default=False, help='Create needed Queues/Exchanges') 
def job(remove, create): 
    exchanges = {'dead_letter': Exchange(name=settings.DEAD_LETTER_EXCHANGE, 
             type=settings.DEAD_LETTER_EXCHANGE_TYPE, 
             durable=settings.DEAD_LETTER_EXCHANGE_DURABLE), 
       'results': Exchange(name=settings.RESULTS_EXCHANGE_NAME, 
            type=settings.RESULTS_EXCHANGE_TYPE, 
            durable=settings.RESULTS_EXCHANGE_DURABLE)} 

    queues = {'dead_letter': Queue(name=settings.DEAD_LETTER_QUEUE, 
            exchange=exchanges['dead_letter'], 
            routing_key=settings.DEAD_LETTER_ROUTING, 
            durable=settings.DEAD_LETTER_EXCHANGE_DURABLE), 
       'results': Queue(name=settings.RESULTS_QUEUE_NAME, 
           exchange=exchanges['results'], 
           routing_key=settings.RESULTS_QUEUE_ROUTING, 
           durable=settings.RESULTS_EXCHANGE_DURABLE), 
       'task': Queue(name=settings.TASK_QUEUE_NAME, 
          exchange=exchanges['results'], 
          routing_key=settings.TASK_ROUTING_KEY, 
          queue_arguments={ 
           "x-message-ttl": settings.TASK_QUEUE_TTL, 
           "x-dead-letter-exchange": settings.DEAD_LETTER_EXCHANGE, 
           "x-dead-letter-routing-key": settings.DEAD_LETTER_ROUTING})} 

    print 'using broker: {}'.format(BROKER_URL) 

    with Connection(BROKER_URL) as conn: 
     channel = conn.channel() 
     if remove: 
      # remove exchanges 
      for (key, exchange) in exchanges.items(): 
       print 'removing exchange: {}'.format(exchange.name) 
       bound_exchange = exchange(channel) 
       bound_exchange.delete() 

      # remove queues 
      for (key, queue) in queues.items(): 
       print 'removing queue {} '.format(queues[key].name) 
       bound_queue = queues[key](channel) 
       bound_queue.delete() 

     if create: 
      # create exchanges 
      for (key, exchange) in exchanges.items(): 
       print 'creating exchange: {}'.format(exchange.name) 
       bound_exchange = exchange(channel) 
       bound_exchange.declare() 

      # add queues 
      for (key, queue) in queues.items(): 
       # if key in exchanges: 
       print 'binding queue {} to exchange {} with routing key {}'.format(queue.name, 
                        queue.exchange.name, 
                        queue.routing_key) 
       bound_queue = queue(channel) 
       bound_queue.declare() 


if __name__ == '__main__': 
    job() 

と設定ファイル:

# rabbit stuff 
RABBIT_SERVER = 'localhost' 
RABBIT_USER = 'guest' 
RABBIT_PASS = 'guest' 
RABBIT_PORT = 5672 
RABBIT_VHOST = '/' 

# default task queue 
TASK_EXCHANGE_NAME = 'test.service.request' 
TASK_EXCHANGE_TYPE = 'direct' 
TASK_EXCHANGE_DURABLE = True 
TASK_QUEUE_NAME = 'test.service.request' 
TASK_ROUTING_KEY = 'request' 
TASK_QUEUE_TTL = 604800000 

# dead letter settings 
DEAD_LETTER_EXCHANGE = 'test.service.deadletter' 
DEAD_LETTER_EXCHANGE_TYPE = 'direct' 
DEAD_LETTER_EXCHANGE_DURABLE = True 
DEAD_LETTER_QUEUE = 'test.service.deadletter' 
DEAD_LETTER_ROUTING = 'deadletter' 

# results settings 
RESULTS_EXCHANGE_NAME = 'test.service.results' 
RESULTS_EXCHANGE_TYPE = 'direct' 
RESULTS_EXCHANGE_DURABLE = True 
RESULTS_QUEUE_NAME = 'test.service.results' 
RESULTS_QUEUE_ROUTING = 'results' 

は今のpython 2.7で新鮮virtualenvの上で上記のスクリプトを実行しているキューを作成することができます:

$ virtualenv rabbit_test 
New python executable in /home/ivan/rabbit_test/bin/python 
Installing setuptools, pip, wheel...done. 

$ source /home/ivan/rabbit_test/bin/activate 

$ pip install kombu 
Collecting kombu 
... 
$ pip install click 
Collecting click 
... 

$ mkdir ~/rabbit_test/app 
$ vi ~/rabbit_test/app/create_queues.py 
$ vi ~/rabbit_test/app/settings.py 
上記のスクリプトをコピーします

を実行し、create_queues.pyを実行します。

$ cd ~/rabbit_test/app 
$ python create_queues.py --create 
using broker: amqp://guest:[email protected]:5672// 
creating exchange: test.service.results 
creating exchange: test.service.deadletter 
binding queue test.service.request to exchange test.service.results with routing key request 
binding queue test.service.results to exchange test.service.results with routing key results 
binding queue test.service.deadletter to exchange test.service.deadletter with routing key deadletter 

あなたは、交換を確認することができますし、キューがcliツールまたはmanagement pluginとウサギの上にある:使用して行うことができます

$ rabbitmqadmin list exchanges 
+-------------------------+---------+ 
|   name   | type | 
+-------------------------+---------+ 
| test.service.deadletter | direct | 
| test.service.results | direct | 
+-------------------------+---------+ 

$ rabbitmqadmin list queues 
+-------------------------+----------+ 
|   name   | messages | 
+-------------------------+----------+ 
| test.service.deadletter | 0  | 
| test.service.request | 0  | 
| test.service.results | 0  | 
+-------------------------+----------+ 

$ rabbitmqadmin list bindings 
+-------------------------+-------------------------+-------------------------+ 
|   source   |  destination  |  routing_key  | 
+-------------------------+-------------------------+-------------------------+ 
|       | test.service.deadletter | test.service.deadletter | 
|       | test.service.request | test.service.request | 
|       | test.service.results | test.service.results | 
| test.service.deadletter | test.service.deadletter | deadletter    | 
| test.service.results | test.service.request | request     | 
| test.service.results | test.service.results | results     | 
+-------------------------+-------------------------+-------------------------+ 

今zato一部(ステップ4,5および6) public api、またはwebadminを使用して、パブリックAPIを使用してそれを行う方法を示しますが、UIを介して行うのはこれがほんのわずかです。

作成し、当社のAMQP接続doc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{ 
    "id": 2, 
    "password1": "guest", 
    "password2": "guest" 
}' "http://localhost:11223/zato/json/zato.definition.amqp.change-password" 

{ 
    "zato_env": { 
    "details": "", 
    "result": "ZATO_OK", 
    "cid": "K07K9YY21XZAX4QKWJB3ZFXN2ZFT" 
    } 
} 

のための発信AMQP接続定義のそれぞれで独立しdoc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{ 
    "cluster_id": 1, 
    "name": "SO_Test", 
    "host": "127.0.0.1", 
    "port": "5672", 
    "vhost": "/", 
    "username": "guest", 
    "frame_max": 131072, 
    "heartbeat": 10 
}' "http://localhost:11223/zato/json/zato.definition.amqp.create" 

{ 
    "zato_env": { 
    "details": "", 
    "result": "ZATO_OK", 
    "cid": "K04DWBPMYF8A7768C7N482E75YM3" 
    }, 
    "zato_definition_amqp_create_response": { 
    "id": 2, 
    "name": "SO_Test" 
    } 
} 

パスワードの設定]をAMQP接続定義を作成しますdoc

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{ 
    "cluster_id": 1, 
    "name": "SO Test", 
    "is_active": true, 
    "def_id": 2, 
    "delivery_mode": 1, 
    "priority": 6, 
    "content_type": "application/json", 
    "content_encoding": "utf-8", 
    "expiration": 30000 
}' "http://localhost:11223/zato/json/zato.outgoing.amqp.create" 

{ 
    "zato_outgoing_amqp_create_response": { 
    "id": 1, 
    "name": "SO Test" 
    }, 
    "zato_env": { 
    "details": "", 
    "result": "ZATO_OK", 
    "cid": "K05F2CR954BFNBP14KGTM26V47PC" 
    } 
} 

最後に、あなたがプロパティを使用しようとしている場合は

from zato.server.service import Service 

class HelloService(Service): 
    def handle(self): 
     # Request parameters 
     msg = 'Hello AMQP broker!' 
     out_name = 'SO Test' 
     exchange = 'test.service.results' 
     routing_key = 'request' 
     properties = {'app_id': 'ESB', 'user_id': 'guest'} 
     headers = {'X-Foo': 'bar'} 

     # Send a message to the broker 
     info = self.outgoing.amqp.send(msg, out_name, exchange, routing_key, 
      properties, headers) 
     self.logger.info(info) 

メッセージを送信しようとしているサービスは、接続のuser_idまたは他の要求は失敗しますと一致している必要がありUSER_ID。

はまた、ここで私はdeadletter交換を作成しましたし、最後のステップは

へをテストすることですそのまだtest.service.requestキュー内

場合、メッセージは30秒後にここに送られることに注意してくださいメッセージがキューに配信されていることを確認し、http/soapチャネルを作成したり、サービスを直接呼び出すことができます。後者はパブリックAPIを使用して行います。

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{ 
    "name": "test.hello-service", 
    "data_format": "json" 
}' "http://localhost:11223/zato/json/zato.service.invoke" 

{ 
    "zato_env": { 
    "details": "", 
    "result": "ZATO_OK", 
    "cid": "K050J64QQ8FXASXHKVCAQNC4JC4N" 
    }, 
    "zato_service_invoke_response": { 
    "response": "" 
    } 
} 

その後、我々は我々だけで送信されたメッセージのキューをチェック:

$ rabbitmqadmin get queue=test.service.request requeue=true 
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+ 
| routing_key |  exchange  | message_count |  payload  | payload_bytes | payload_encoding | redelivered | 
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+ 
| request  | test.service.results | 0    | Hello AMQP broker! | 18   | string   | False  | 
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+ 

は、あなたはまだ何か問題がある場合にはウサギとzatoサーバーのログをチェックすることを忘れないでください。

関連する問題