2016-08-11 14 views
0

私は、複数のキューを同時に使用するために、asyncioとasynqpを使用しています。Asyncioとrabbitmq(asynqp):複数のキューから同時に消費する方法

なぜ私のasyncio.sleep()関数呼び出しが効果を持たないのか分かりません。コードはそこで一時停止しません。公平であるためには、コールバックがどのコンテキストで実行されているのか、イベントループにコントロールbavckを与えることができるのかどうかはわかりません(asyncio.sleep()コールが意味をなされるように)。

どうすれば私のprocess_msgコールバック関数でaiohttp.ClientSession.get()関数呼び出しを使用しなければなりませんでしたか?私はコルーチンではないので、私はそれを行うことができません。私の現在のasyncioの理解を超えた方法が必要です。

#!/usr/bin/env python3 

import asyncio 
import asynqp 


USERS = {'betty', 'bob', 'luis', 'tony'} 


def process_msg(msg): 
    asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

async def connect(): 
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test') 
    channel = await connection.open_channel() 
    exchange = await channel.declare_exchange('inboxes', 'direct') 

    # we have 10 users. Set up a queue for each of them 
    # use different channels to avoid any interference 
    # during message consumption, just in case. 
    for username in USERS: 
     user_channel = await connection.open_channel() 
     queue = await user_channel.declare_queue('Inbox_{}'.format(username)) 
     await queue.bind(exchange, routing_key=username) 
     await queue.consume(process_msg) 

    # deliver 10 messages to each user 
    for username in USERS: 
     for msg_idx in range(10): 
      msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username)) 
      exchange.publish(msg, routing_key=username) 


loop = asyncio.get_event_loop() 
loop.run_until_complete(connect()) 
loop.run_forever() 

答えて

1

なぜ私のasyncio.sleep()関数呼び出しには 効果がないのか分かりません。

asyncio.sleep()はイベントループ(又はasync/awaitセマンティクス)と組み合わせて使用​​されなければならない将来のオブジェクトを返すため。

簡易def宣言でawaitを使用することはできません。コールバックは、フードの下のイベントループに添付されているasync/awaitコンテキスト外で呼び出されるためです。言い換えれば、async/awaitスタイルのコールバックスタイルを混合することは非常に難しいです。即ちprocess_msgの本体は、一方_process_msgで実行されていない_process_msg関数には再帰が存在しないこと

async def process_msg(msg): 
    await asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

def _process_msg(msg): 
    loop = asyncio.get_event_loop() 
    loop.create_task(process_msg(msg)) 
    # or if loop is always the same one single line is enough 
    # asyncio.ensure_future(process_msg(msg)) 

# some code 
await queue.consume(_process_msg) 

注:

しかし単純な解決策は、バックイベントループに作業をスケジュールすることです。コントロールがイベントループに戻ると、内部のprocess_msg関数が呼び出されます。

これは、次のコードで一般化することができます。

def async_to_callback(coro): 
    def callback(*args, **kwargs): 
     asyncio.ensure_future(coro(*args, **kwargs)) 
    return callback 

async def process_msg(msg): 
    # the body 

# some code 
await queue.consume(async_to_callback(process_msg)) 
関連する問題