2016-03-31 11 views
2

現在、私はasyncioでPython 3.5で私の最初のステップをやっています。明らかに私はコルーチンを完全に理解していません...Pythonでマルチスレッドasyncio

ここでは私がやっていることの単純化バージョンです。

私のクラスには、新しいスレッドを作成するopen()メソッドがあります。そのスレッド内で、新しいイベントループといくつかのホストへのソケット接続を作成します。それから私はループを永遠に走らせます。接続を停止

def open(self): 
    # create thread 
    self.thread = threading.Thread(target=self._thread) 
    self.thread.start() 
    # wait for connection 
    while self.protocol is None: 
     time.sleep(0.1) 

def _thread(self): 
    # create loop, connection and run forever 
    self.loop = asyncio.new_event_loop() 
    coro = self.loop.create_connection(lambda: MyProtocol(self.loop), 
             'somehost.com', 1234) 
    self.loop.run_until_complete(coro) 
    self.loop.run_forever() 

は、私はちょうどメインスレッドからループを停止し、今非常に簡単です:

loop.call_soon_threadsafe(loop.stop) 

は、残念ながら、私は特に、私はから切断する前にキューを空にする必要があり、いくつかのクリーンアップを行う必要がありますサーバー。だから私はMyProtocolでこのstop()メソッドのようなものを試してみました:

class MyProtocol(asyncio.Protocol): 
    def __init__(self, loop): 
     self._loop = loop 
     self._queue = [] 

    async def stop(self): 
     # wait for all queues to empty 
     while self._queue: 
      await asyncio.sleep(0.1) 
     # disconnect 
     self.close() 
     self._loop.stop() 
キューは、プロトコルのdata_received()メソッド内から空に取得し

ので、私はちょうどasyncioでwhileループを使用して発生することを待ちたいです.sleep()呼び出し。その後、私は接続を閉じて、ループを停止します。

しかし、このメソッドをメインスレッドからどのように呼び出すか、それを待つのですか? (プロトコルはMyProtocolの現在使用さインスタンスである)私は次のことを試してみましたが、それらのどれも動作するようには思えません:

loop.call_soon_threadsafe(protocol.stop) 
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop)) 
asyncio.ensure_future(protocol.stop(), loop=loop) 

誰もがここで私を助けてくださいことはできますか?ありがとう!

答えて

4

基本的には、別のスレッドのループでコルーチンをスケジュールする必要があります。あなたはrun_coroutine_threadsafeを使用することができます。

future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop) 
future.result() # wait for results 

か、古いスタイルasyncが好きhttps://stackoverflow.com/a/32084907/681044

import asyncio 
from threading import Thread 

loop = asyncio.new_event_loop() 

def f(loop): 
    asyncio.set_event_loop(loop) 
    loop.run_forever() 

t = Thread(target=f, args=(loop,)) 
t.start()  

@asyncio.coroutine 
def g(): 
    yield from asyncio.sleep(1) 
    print('Hello, world!') 

loop.call_soon_threadsafe(asyncio.async, g()) 
+0

Worksで魔法のように、ありがとう! –