2017-05-29 9 views
0

イベントループを使用して、asyncio.Queue(ソースコードはhttps://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py)に挿入されているデータを監視したいのですが、いくつかの問題があります。ここでは、次のコードは、次のとおりです。手動でasyncioイベントループをオンに切り替える方法はありますか

import asyncio 
import threading 

async def recv(q): 
    while True: 
     msg = await q.get() 
     print(msg) 

async def checking_task(): 
    while True: 
     await asyncio.sleep(0.1) 

def loop_in_thread(loop,q): 
    asyncio.set_event_loop(loop) 
    asyncio.ensure_future(recv(q)) 
    asyncio.ensure_future(insert(q)) 
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended 
    loop.run_forever() 

async def insert(q): 
    print('invoked') 
    await q.put('hello') 

q = asyncio.Queue() 
loop = asyncio.get_event_loop() 
t = threading.Thread(target=loop_in_thread, args=(loop, q,)) 
t.start() 

プログラムが開始されていると我々は次のような結果

invoked 
hello 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>} 

を見ることができます。しかし、我々は手動でq.put_nowait('test')を使用してqにデータを追加した場合、今、私たちは次のような結果になるだろう:あなたが見ることができるように

q.put_nowait('test') # a non-async way to add data into queue 
-> print(asyncio.Task.all_tasks()) 
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39> 
wait_for=<Future finished result=None>>} 

、将来は、すでに終了して、まだ我々はまだ新しく追加された文字列をプリントアウトしていない'test'。言い換えれば、msg = await q.get()は、q.get()に関連する未来が実行され、他のタスクが実行されていなくても、まだ待機しています。公式ドキュメント(https://docs.python.org/3/library/asyncio-task.html)で、それは

結果は=待つが、将来または結果=未来からの収量が言うので、これは私を混乱 - 未来が完了するまでのコルーチンを中断し、その後、将来の結果を返し、

未来が完了したにもかかわらず、イベントループが処理タスクを続けるようにするには、まだ他の非同期関数にawaitの何らかの種類が必要であると思われました。

checking_task()を追加するこの問題を回避する方法を見つけました。また、そのコルーチンをイベントループに追加しました。意図したとおりに動作します。

しかし、check_task()コルーチンは、whileループを実行するだけなので、CPUには非常にコストがかかります。非同期機能を使用せずにそのawaitイベントをトリガする手動の方法があるかどうか疑問に思っています。たとえば、何かのような魔法のような

q.put_nowait('test') 
loop.ok_you_can_start_running_other_pending_tasks() 

非常に感謝します!ありがとう。

答えて

0

だから私は

loop.call_soon_threadsafe(q.put_nowait, 'test') 

を使用してしまったし、意図したように、それは動作します。これを理解した後、いくつかの情報を検索しました。この記事(Scheduling an asyncio coroutine from another thread)にも同じ問題があることが判明しました。そして、@のKFXの答えも)

loop.call_soon_threadsafe(loop.create_task, q.put('test')) 

お知らせasyncio.Queue.put(あるコルーチンですがasyncio.Queue.put_nowait()が正常な機能である、働くだろう。

関連する問題