:
import asyncio
loop = asyncio.get_event_loop()
data_queue = asyncio.Queue(loop=loop)
receiving = True
def forward(data):
queue = process(queue, data)
if (len(queue) > 0):
blocking_send_from_queue()
return True
else:
return False
async def receive():
while receiving:
data = await non_blocking_recv()
await data_queue.put(data)
async def main():
receiver = loop.create_task(receive())
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, forward, data):
global receiving
receiving = False
data_queue.get_no_wait()
break
await receiver
loop.run_until_complete(main())
だからを、non_blocking_recv()
とblocking_send_from_queue()
はdata_queue
とメモリにdata
ブロックをバッファリングのコストで、お互いをブロックすることなく、並列に実行することができます。キューサイズをdata_queue
に設定して、バッファサイズを制御できます。
さらに、process()
およびblocking_send_from_queue()
は、並行して実行できる方法のようです。実際の実装によりますが、queue
は永続的なものであると仮定しますthreading.Queue
オブジェクトはここにあります。理想的には、最大1つnon_blocking_recv
、1 process
と1 blocking_send_from_queue
を並列に実行することができ
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, do_process, data):
break
:に変更されなければならない(シャットダウンコードをスキップ)
def do_process(data):
process(queue, data)
return len(queue) > 0
def forward():
while True:
blocking_send_from_queue()
そしてmain
:したがってforward
はに分割することができますGILがblocking_send_from_queue
の中に適切にリリースされているかどうか。
最後
、GILもprocess
に放出され、そして送信順序が受注と同じであることが要求されない場合、並列にprocess
を実行することにより、マルチコア・コンピューティング能力を十分に活用することも可能である場合: (部分コード)
async def process_worker():
while True:
data = await data_queue.get()
loop.run_in_executor(None, process, queue, data)
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
for _ in range(CPU_CORE_NUM):
loop.create_task(process_worker())
# wait for all tasks here
本当に競合状態になりやすいようです。 – user2357112