私は、WebサービスとWebソケットを介して通信するPythonベースのCLIを作成しようとしています。私が遭遇している1つの問題は、CLIによってWebサービスに対して行われた要求が断続的に処理されないことです。私はイベントループが既に実行されている場合、コルーチンがメソッド内で同期して完了するのを待つ方法はありますか?
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
を:ソケットが閉じたWebサービスからのログを見ると、私は問題が頻繁にこれらの要求は(後であってもまたは)が同時に行われていることに起因していることを見ることができますCLI私は、Webサービスとのすべての直接通信を処理するクラスCommunicationService
を定義します。内部的には、websockets
パッケージを使用して通信を処理します。それ自体はasyncio
の上に構築されています。
CommunicationService
は、リクエストを送信するための次のメソッドが含まれます。
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
を...以前、別の方法で開くws
がWebSocketのです:私が欲しいもの
self._ws = await websockets.connect(websocket_address)
は待つことができるようにすることです将来はasyncio.ensure_future
によって返され、必要に応じて、WebSocketが閉じられる前にWebサービスが要求を処理する時間を与えるために、少し後にスリープします。
ただし、send_request
は同期方式であるため、これらの先物を単にawait
とすることはできません。それを返すコルーチンのオブジェクトを待つことは何もないので、非同期にすることは無意味になります。ループが呼び出された時点ですでに実行されているので、loop.run_until_complete
も使用できません。
私はmail.python.orgにあるものと非常に似た問題を記述している誰かを見つけました。そのスレッドに投稿されたソリューションは、関数は、ループがすでに実行された場合にはコルーチンオブジェクトを返す作ることだった。
def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
私はPyRx(Python実装で働いているように、これは、私のために可能ではありません反応性フレームワークの)とsend_request
のみ戻り値は破棄されますを意味し、自分のコードには使用できませんRxの観察可能なの加入者、と呼ばれる:サイドノートで
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
、私はわかりませんasyncio
によく遭遇する問題の場合や、それが得られない場合は問題ですそれを使用するにはかなりイライラしています。 unhelpfullyちょうど私が強制的に破棄するよ別のコルーチンを返す 『待つ』の
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
一方、asyncio
のバージョン:(例えば)C#では、私がする必要があるでしょうすべては、おそらく次のようなものです。
更新
私が働くようで、この問題を回避する方法を見つけました。コマンドが実行された後、CLIが終了する前に非同期コールバックが実行されるので、これを次のように変更しました。
async def after_command():
await comms.stop()
...これまで:
今後の参考に、この問題に対する回答をお寄せいただきありがとうございます。私は他の状況でこのような回避策に頼ることができないかもしれませんが、CommunicationService
のクライアントがタイミングの問題に心配する必要がないように、遅れをsend_request
の内部で実行する方が良いと思われます。ヴィンセントの質問に関しては
:
あなたのループの実行が別のスレッドでないか、に、send_requestいくつかのコールバックによって呼び出されますか?
すべてが同じスレッドで実行されます。コールバックによって呼び出されます。何が起こるかは、すべての私のコマンドを非同期コールバックを使用するように定義し、実行するとそれらのうちのいくつかがWebサービスにリクエストを送信しようとします。それらは非同期であるため、CLIの最上位にあるloop.run_until_complete
への呼び出しによって実行されるまでは実行されません。つまり、実行途中にループが実行され、リクエスト(send_request
への間接的な電話による)。
アップデートは2
ここで「完了」コールバックを追加するヴィンセントの提案をベースとしたソリューションです。
新しいブール型フィールド_busy
が、に追加され、通信アクティビティが発生しているかどうかを表します。
CommunicationService.send_request
_busy
を設定するように変更して、一度行って
_busy
リセットするために
_ws.send
にコールバックを提供している:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
CommunicationService.stop
は今、このフラグを待つために実装される前に、偽に設定されることを進展:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
これはあまりにも動作しているようですし、少なくともこの方法は、すべての通信タイミングロジックは内にカプセル化されていますクラスが必要です。ヴィンセントの提案に基づいて
アップデート3
よりよいソリューション。
self._busy
の代わりにself._send_request_tasks = []
があります。
新send_request
実装:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
新stop
実装:
self._send_request_tasks = set()
スケジュールタスクがensure_future
を使用し、使用してクリーンアップ:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
を ''別のスレッドで、あなたのループの実行をい「ループはすでにそれが呼び出された時点で実行されているように私もloop.run_until_complete使用することはできません。」 、またはいくつかのコールバックによって呼び出される 'send_request'ですか? – Vincent
すべてがシングルスレッドなので、コールバックによって呼び出されます。私は投稿を更新しました。 – Tagc
'asyncio.ensure_future(self._ws.send(request))。add_done_callback(...)'を使用して、リクエストが送信された後にコールバックを実行するようスケジュールするのはどうですか? – Vincent