2016-09-13 7 views
0

私は、WebサービスとWebソケットを介して通信するPythonベースのCLIを作成しようとしています。私が遭遇している1つの問題は、CLIによってWebサービスに対して行われた要求が断続的に処理されないことです。私はイベントループが既に実行されている場合、コルーチンがメソッド内で同期して完了するのを待つ方法はありますか?

2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened 
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message 
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message 
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"} 
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed 

を:ソケットが閉じたWebサービスからのログを見ると、私は問題が頻繁にこれらの要求は(後であってもまたは)が同時に行われていることに起因していることを見ることができますCLI私は、Webサービスとのすべての直接通信を処理するクラスCommunicationServiceを定義します。内部的には、websocketsパッケージを使用して通信を処理します。それ自体はasyncioの上に構築されています。

CommunicationServiceは、リクエストを送信するための次のメソッドが含まれます。

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 
    asyncio.ensure_future(self._ws.send(request)) 

を...以前、別の方法で開くwsがWebSocketのです:私が欲しいもの

self._ws = await websockets.connect(websocket_address) 

は待つことができるようにすることです将来はasyncio.ensure_futureによって返され、必要に応じて、WebSocketが閉じられる前にWebサービスが要求を処理する時間を与えるために、少し後にスリープします。

ただし、send_requestは同期方式であるため、これらの先物を単にawaitとすることはできません。それを返すコルーチンのオブジェクトを待つことは何もないので、非同期にすることは無意味になります。ループが呼び出された時点ですでに実行されているので、loop.run_until_completeも使用できません。

私はmail.python.orgにあるものと非常に似た問題を記述している誰かを見つけました。そのスレッドに投稿されたソリューションは、関数は、ループがすでに実行された場合にはコルーチンオブジェクトを返す作ることだった。

def aio_map(coro, iterable, loop=None): 
    if loop is None: 
     loop = asyncio.get_event_loop() 

    coroutines = map(coro, iterable) 
    coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop) 

    if loop.is_running(): 
     return coros 
    else: 
     return loop.run_until_complete(coros) 

私はPyRx(Python実装で働いているように、これは、私のために可能ではありません反応性フレームワークの)とsend_requestのみ戻り値は破棄されますを意味し、自分のコードには使用できませんRxの観察可能なの加入者、と呼ばれる:サイドノートで

class AnonymousObserver(ObserverBase): 
    ... 
    def _on_next_core(self, value): 
     self._next(value) 

、私はわかりませんasyncioによく遭遇する問題の場合や、それが得られない場合は問題ですそれを使用するにはかなりイライラしています。 unhelpfullyちょうど私が強制的に破棄するよ別のコルーチンを返す 『待つ』の

void SendRequest(string request) 
{ 
    this.ws.Send(request).Wait(); 
    // Task.Delay(500).Wait(); // Uncomment If necessary 
} 

一方、asyncioのバージョン:(例えば)C#では、私がする必要があるでしょうすべては、おそらく次のようなものです。

更新

私が働くようで、この問題を回避する方法を見つけました。コマンドが実行された後、CLIが終了する前に非同期コールバックが実行されるので、これを次のように変更しました。

async def after_command(): 
    await comms.stop() 

...これまで:

今後の参考に、この問題に対する回答をお寄せいただきありがとうございます。私は他の状況でこのような回避策に頼ることができないかもしれませんが、CommunicationServiceのクライアントがタイミングの問題に心配する必要がないように、遅れをsend_requestの内部で実行する方が良いと思われます。ヴィンセントの質問に関しては

あなたのループの実行が別のスレッドでないか、に、send_requestいくつかのコールバックによって呼び出されますか?

すべてが同じスレッドで実行されます。コールバックによって呼び出されます。何が起こるかは、すべての私のコマンドを非同期コールバックを使用するように定義し、実行するとそれらのうちのいくつかがWebサービスにリクエストを送信しようとします。それらは非同期であるため、CLIの最上位にあるloop.run_until_completeへの呼び出しによって実行されるまでは実行されません。つまり、実行途中にループが実行され、リクエスト(send_requestへの間接的な電話による)。

アップデートは2

ここで「完了」コールバックを追加するヴィンセントの提案をベースとしたソリューションです。

新しいブール型フィールド_busyが、に追加され、通信アクティビティが発生しているかどうかを表します。

CommunicationService.send_request

は、要求を送信する前に、真 _busyを設定するように変更して、一度行って _busyリセットするために _ws.sendにコールバックを提供している:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 

    def callback(_): 
     self._busy = False 

    self._busy = True 
    asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback) 

CommunicationService.stopは今、このフラグを待つために実装される前に、偽に設定されることを進展:

async def stop(self) -> None: 
    """ 
    Terminate communications with TestCube Web Service. 
    """ 
    if self._listen_task is None or self._ws is None: 
     return 

    # Wait for comms activity to stop. 
    while self._busy: 
     await asyncio.sleep(0.1) 

    # Allow short delay after final request is processed. 
    await asyncio.sleep(0.1) 

    self._listen_task.cancel() 
    await asyncio.wait([self._listen_task, self._ws.close()]) 

    self._listen_task = None 
    self._ws = None 
    logger.info('Terminated connection to TestCube Web Service') 

これはあまりにも動作しているようですし、少なくともこの方法は、すべての通信タイミングロジックは内にカプセル化されていますクラスが必要です。ヴィンセントの提案に基づいて

アップデート3

よりよいソリューション。

self._busyの代わりにself._send_request_tasks = []があります。

send_request実装:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 

    task = asyncio.ensure_future(self._ws.send(request)) 
    self._send_request_tasks.append(task) 

stop実装:

self._send_request_tasks = set() 

スケジュールタスクがensure_futureを使用し、使用してクリーンアップ:

async def stop(self) -> None: 
    if self._listen_task is None or self._ws is None: 
     return 

    # Wait for comms activity to stop. 
    if self._send_request_tasks: 
     await asyncio.wait(self._send_request_tasks) 
    ... 
+0

を ''別のスレッドで、あなたのループの実行をい「ループはすでにそれが呼び出された時点で実行されているように私もloop.run_until_complete使用することはできません。」 、またはいくつかのコールバックによって呼び出される 'send_request'ですか? – Vincent

+0

すべてがシングルスレッドなので、コールバックによって呼び出されます。私は投稿を更新しました。 – Tagc

+1

'asyncio.ensure_future(self._ws.send(request))。add_done_callback(...)'を使用して、リクエストが送信された後にコールバックを実行するようスケジュールするのはどうですか? – Vincent

答えて

2

あなたはsetのタスクで使用することができますadd_done_callback

def send_request(self, request: str) -> None: 
    task = asyncio.ensure_future(self._ws.send(request)) 
    self._send_request_tasks.add(task) 
    task.add_done_callback(self._send_request_tasks.remove) 

とタスクのsetが完了するのを待つ:

async def stop(self): 
    if self._send_request_tasks: 
     await asyncio.wait(self._send_request_tasks) 
+0

[documentation](https://docs.python.org/3/library/asyncio-task.html)による「asyncio.wait」を持つ小さな警告は、「シーケンス* future *は空であってはいけません」ということですので、セットを空でないことをチェックするif文の中に 'asyncio.wait'を入れ子にするだけです(私の投稿で行ったように)。そうでなければ、これは素晴らしいとそれは動作するようです。 – Tagc

+0

@Tagcあなたが正しいです、私の編集を参照してください。 – Vincent

0

を考えると、あなたが効果的にawaitを自分で実装するyield fromキーワードを使用することができ、非同期関数内じゃないということ。次のコードは、将来のリターンまでブロックします:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 
    future = asyncio.ensure_future(self._ws.send(request)) 
    yield from future.__await__() 
関連する問題