にサービスがempty.thisでいっぱい実施例である取得しますウサギ
- 公開または消費するzatoサービスを書くzato
に出AMQP接続定義のそれぞれで独立し作成
- zato
に接続定義を作成して交流にキュー
- バインドキューを作成します
- 交換を作成します。
最初の3つのステップは、ここでは、複数の方法で行うことができますが(ちょうど昆布をインストールして、クリックする)ことを行うために使用することができ、簡単なPythonスクリプトです:
import click
import os
import sys
import settings
from kombu import Connection, Exchange, Queue
BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(user=settings.RABBIT_USER,
password=settings.RABBIT_PASS,
server=settings.RABBIT_SERVER,
port=settings.RABBIT_PORT,
vhost=settings.RABBIT_VHOST)
@click.command()
@click.option('--remove/--no-remove', default=False, help='Remove current Queues/Exchanges.')
@click.option('--create/--no-create', default=False, help='Create needed Queues/Exchanges')
def job(remove, create):
exchanges = {'dead_letter': Exchange(name=settings.DEAD_LETTER_EXCHANGE,
type=settings.DEAD_LETTER_EXCHANGE_TYPE,
durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
'results': Exchange(name=settings.RESULTS_EXCHANGE_NAME,
type=settings.RESULTS_EXCHANGE_TYPE,
durable=settings.RESULTS_EXCHANGE_DURABLE)}
queues = {'dead_letter': Queue(name=settings.DEAD_LETTER_QUEUE,
exchange=exchanges['dead_letter'],
routing_key=settings.DEAD_LETTER_ROUTING,
durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
'results': Queue(name=settings.RESULTS_QUEUE_NAME,
exchange=exchanges['results'],
routing_key=settings.RESULTS_QUEUE_ROUTING,
durable=settings.RESULTS_EXCHANGE_DURABLE),
'task': Queue(name=settings.TASK_QUEUE_NAME,
exchange=exchanges['results'],
routing_key=settings.TASK_ROUTING_KEY,
queue_arguments={
"x-message-ttl": settings.TASK_QUEUE_TTL,
"x-dead-letter-exchange": settings.DEAD_LETTER_EXCHANGE,
"x-dead-letter-routing-key": settings.DEAD_LETTER_ROUTING})}
print 'using broker: {}'.format(BROKER_URL)
with Connection(BROKER_URL) as conn:
channel = conn.channel()
if remove:
# remove exchanges
for (key, exchange) in exchanges.items():
print 'removing exchange: {}'.format(exchange.name)
bound_exchange = exchange(channel)
bound_exchange.delete()
# remove queues
for (key, queue) in queues.items():
print 'removing queue {} '.format(queues[key].name)
bound_queue = queues[key](channel)
bound_queue.delete()
if create:
# create exchanges
for (key, exchange) in exchanges.items():
print 'creating exchange: {}'.format(exchange.name)
bound_exchange = exchange(channel)
bound_exchange.declare()
# add queues
for (key, queue) in queues.items():
# if key in exchanges:
print 'binding queue {} to exchange {} with routing key {}'.format(queue.name,
queue.exchange.name,
queue.routing_key)
bound_queue = queue(channel)
bound_queue.declare()
if __name__ == '__main__':
job()
と設定ファイル:
# rabbit stuff
RABBIT_SERVER = 'localhost'
RABBIT_USER = 'guest'
RABBIT_PASS = 'guest'
RABBIT_PORT = 5672
RABBIT_VHOST = '/'
# default task queue
TASK_EXCHANGE_NAME = 'test.service.request'
TASK_EXCHANGE_TYPE = 'direct'
TASK_EXCHANGE_DURABLE = True
TASK_QUEUE_NAME = 'test.service.request'
TASK_ROUTING_KEY = 'request'
TASK_QUEUE_TTL = 604800000
# dead letter settings
DEAD_LETTER_EXCHANGE = 'test.service.deadletter'
DEAD_LETTER_EXCHANGE_TYPE = 'direct'
DEAD_LETTER_EXCHANGE_DURABLE = True
DEAD_LETTER_QUEUE = 'test.service.deadletter'
DEAD_LETTER_ROUTING = 'deadletter'
# results settings
RESULTS_EXCHANGE_NAME = 'test.service.results'
RESULTS_EXCHANGE_TYPE = 'direct'
RESULTS_EXCHANGE_DURABLE = True
RESULTS_QUEUE_NAME = 'test.service.results'
RESULTS_QUEUE_ROUTING = 'results'
は今のpython 2.7で新鮮virtualenvの上で上記のスクリプトを実行しているキューを作成することができます:
$ virtualenv rabbit_test
New python executable in /home/ivan/rabbit_test/bin/python
Installing setuptools, pip, wheel...done.
$ source /home/ivan/rabbit_test/bin/activate
$ pip install kombu
Collecting kombu
...
$ pip install click
Collecting click
...
が
$ mkdir ~/rabbit_test/app
$ vi ~/rabbit_test/app/create_queues.py
$ vi ~/rabbit_test/app/settings.py
上記のスクリプトをコピーします
を実行し、create_queues.pyを実行します。
$ cd ~/rabbit_test/app
$ python create_queues.py --create
using broker: amqp://guest:[email protected]:5672//
creating exchange: test.service.results
creating exchange: test.service.deadletter
binding queue test.service.request to exchange test.service.results with routing key request
binding queue test.service.results to exchange test.service.results with routing key results
binding queue test.service.deadletter to exchange test.service.deadletter with routing key deadletter
あなたは、交換を確認することができますし、キューがcliツールまたはmanagement pluginとウサギの上にある:使用して行うことができます
$ rabbitmqadmin list exchanges
+-------------------------+---------+
| name | type |
+-------------------------+---------+
| test.service.deadletter | direct |
| test.service.results | direct |
+-------------------------+---------+
$ rabbitmqadmin list queues
+-------------------------+----------+
| name | messages |
+-------------------------+----------+
| test.service.deadletter | 0 |
| test.service.request | 0 |
| test.service.results | 0 |
+-------------------------+----------+
$ rabbitmqadmin list bindings
+-------------------------+-------------------------+-------------------------+
| source | destination | routing_key |
+-------------------------+-------------------------+-------------------------+
| | test.service.deadletter | test.service.deadletter |
| | test.service.request | test.service.request |
| | test.service.results | test.service.results |
| test.service.deadletter | test.service.deadletter | deadletter |
| test.service.results | test.service.request | request |
| test.service.results | test.service.results | results |
+-------------------------+-------------------------+-------------------------+
今zato一部(ステップ4,5および6) public api、またはwebadminを使用して、パブリックAPIを使用してそれを行う方法を示しますが、UIを介して行うのはこれがほんのわずかです。
作成し、当社のAMQP接続doc
$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
"id": 2,
"password1": "guest",
"password2": "guest"
}' "http://localhost:11223/zato/json/zato.definition.amqp.change-password"
{
"zato_env": {
"details": "",
"result": "ZATO_OK",
"cid": "K07K9YY21XZAX4QKWJB3ZFXN2ZFT"
}
}
のための発信AMQP接続定義のそれぞれで独立しdoc
$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
"cluster_id": 1,
"name": "SO_Test",
"host": "127.0.0.1",
"port": "5672",
"vhost": "/",
"username": "guest",
"frame_max": 131072,
"heartbeat": 10
}' "http://localhost:11223/zato/json/zato.definition.amqp.create"
{
"zato_env": {
"details": "",
"result": "ZATO_OK",
"cid": "K04DWBPMYF8A7768C7N482E75YM3"
},
"zato_definition_amqp_create_response": {
"id": 2,
"name": "SO_Test"
}
}
パスワードの設定]をAMQP接続定義を作成しますdoc
curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
"cluster_id": 1,
"name": "SO Test",
"is_active": true,
"def_id": 2,
"delivery_mode": 1,
"priority": 6,
"content_type": "application/json",
"content_encoding": "utf-8",
"expiration": 30000
}' "http://localhost:11223/zato/json/zato.outgoing.amqp.create"
{
"zato_outgoing_amqp_create_response": {
"id": 1,
"name": "SO Test"
},
"zato_env": {
"details": "",
"result": "ZATO_OK",
"cid": "K05F2CR954BFNBP14KGTM26V47PC"
}
}
最後に、あなたがプロパティを使用しようとしている場合は
from zato.server.service import Service
class HelloService(Service):
def handle(self):
# Request parameters
msg = 'Hello AMQP broker!'
out_name = 'SO Test'
exchange = 'test.service.results'
routing_key = 'request'
properties = {'app_id': 'ESB', 'user_id': 'guest'}
headers = {'X-Foo': 'bar'}
# Send a message to the broker
info = self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
properties, headers)
self.logger.info(info)
メッセージを送信しようとしているサービスは、接続のuser_idまたは他の要求は失敗しますと一致している必要がありUSER_ID。
はまた、ここで私はdeadletter交換を作成しましたし、最後のステップは
へをテストすることですそのまだtest.service.request
キュー内
場合、メッセージは30秒後にここに送られることに注意してくださいメッセージがキューに配信されていることを確認し、http/soapチャネルを作成したり、サービスを直接呼び出すことができます。後者はパブリックAPIを使用して行います。
curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
"name": "test.hello-service",
"data_format": "json"
}' "http://localhost:11223/zato/json/zato.service.invoke"
{
"zato_env": {
"details": "",
"result": "ZATO_OK",
"cid": "K050J64QQ8FXASXHKVCAQNC4JC4N"
},
"zato_service_invoke_response": {
"response": ""
}
}
その後、我々は我々だけで送信されたメッセージのキューをチェック:
$ rabbitmqadmin get queue=test.service.request requeue=true
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| request | test.service.results | 0 | Hello AMQP broker! | 18 | string | False |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
は、あなたはまだ何か問題がある場合にはウサギとzatoサーバーのログをチェックすることを忘れないでください。
「交換」の種類はありますか?どのようにキューを交換先にバインドしましたか? – Gabriele
ウサギのデビューしたIMと交換のタイプが話題になりました、私は交換にキューをバインドする方法を知らなかった – hend