2017-08-05 10 views
0
while True: 
     if (len(queue) > 0): 
      blocking_send_from_queue() 
      there_is_something_to_send = False 
     else: 
      break 
     data = non_blocking_recv() 
     queue = process(queue, data) 

これは、クライアントからのコードであり、クエリを処理することになっています。pythonのasyncioはこの機能を改善するために何ができますか?

クエリを完了するには、受信したデータに基づいてより多くのサブクエリを作成してサーバーに送信し、サブクエリの結果などを処理する必要があります。

asyncioを使用して、より効率的な結果や拡張された結果を作成できますか?あなたはこのような何かしようとするかもしれ

+0

本当に競合状態になりやすいようです。 – user2357112

答えて

0

import asyncio 

loop = asyncio.get_event_loop() 
data_queue = asyncio.Queue(loop=loop) 
receiving = True 

def forward(data): 
    queue = process(queue, data) 
    if (len(queue) > 0): 
     blocking_send_from_queue() 
     return True 
    else: 
     return False 

async def receive(): 
    while receiving: 
     data = await non_blocking_recv() 
     await data_queue.put(data) 

async def main(): 
    receiver = loop.create_task(receive()) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, forward, data): 
      global receiving 
      receiving = False 
      data_queue.get_no_wait() 
      break 
    await receiver 

loop.run_until_complete(main()) 

だからを、non_blocking_recv()blocking_send_from_queue()data_queueとメモリにdataブロックをバッファリングのコストで、お互いをブロックすることなく、並列に実行することができます。キューサイズをdata_queueに設定して、バッファサイズを制御できます。

さらに、process()およびblocking_send_from_queue()は、並行して実行できる方法のようです。実際の実装によりますが、queueは永続的なものであると仮定しますthreading.Queueオブジェクトはここにあります。理想的には、最大1つnon_blocking_recv、1 processと1 blocking_send_from_queueを並列に実行することができ

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, do_process, data): 
      break 

:に変更されなければならない(シャットダウンコードをスキップ)

def do_process(data): 
    process(queue, data) 
    return len(queue) > 0 

def forward(): 
    while True: 
     blocking_send_from_queue() 

そしてmain:したがってforwardはに分割することができますGILがblocking_send_from_queueの中に適切にリリースされているかどうか。

最後

、GILもprocessに放出され、そして送信順序が受注と同じであることが要求されない場合、並列にprocessを実行することにより、マルチコア・コンピューティング能力を十分に活用することも可能である場合: (部分コード)

async def process_worker(): 
    while True: 
     data = await data_queue.get() 
     loop.run_in_executor(None, process, queue, data) 

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    for _ in range(CPU_CORE_NUM): 
     loop.create_task(process_worker()) 
    # wait for all tasks here 
関連する問題