私は、複数のキューを同時に使用するために、asyncioとasynqpを使用しています。Asyncioとrabbitmq(asynqp):複数のキューから同時に消費する方法
なぜ私のasyncio.sleep()
関数呼び出しが効果を持たないのか分かりません。コードはそこで一時停止しません。公平であるためには、コールバックがどのコンテキストで実行されているのか、イベントループにコントロールbavckを与えることができるのかどうかはわかりません(asyncio.sleep()
コールが意味をなされるように)。
どうすれば私のprocess_msg
コールバック関数でaiohttp.ClientSession.get()
関数呼び出しを使用しなければなりませんでしたか?私はコルーチンではないので、私はそれを行うことができません。私の現在のasyncioの理解を超えた方法が必要です。
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()