2016-09-06 25 views
1

Webサービスとやり取りするCLIを開発しています。実行すると、通信を確立し、要求を送信し、応答を受信して​​処理してから終了します。私はコードとasyncioのさまざまな部分でコルーチンを使ってそれらを駆動しています。私が望むのは、これらのすべてのステップを実行し、最後にすべてのコルーチンがきれいに終了するようにすることです(つまり、asyncioに不平を感じさせない方法で)。残念ながら、私はasyncioがC#のような他の言語の非同期性よりもはるかに使いやすく理解しにくいことを発見しています。長期実行asyncioタスクをキャンセルする方法が分かりません

は私がのWebSocket接続を介してWebサービスとすべての直接の通信を処理するクラスを定義します。

class CommunicationService(object): 
    def __init__(self): 
     self._ws = None 
     self._listen_task = None 
     self._loop = asyncio.get_event_loop() 

    ... 

    async def _listen_for_responses(self): 
     logging.debug('Listening for responses') 
     while True: 
      message = await self._ws.recv() 
      self.__received_response.on_next(message) 

    def establish_communication(self, hostname: str, port: int) -> None: 
     websocket_address = 'ws://{}:{}/'.format(hostname, port) 

     self._ws = self._loop.run_until_complete(websockets.connect(websocket_address)) 
     self._listen_task = asyncio.ensure_future(self._listen_for_responses()) 

    def send_request(self, request: str) -> None: 
     return asyncio.ensure_future(self._ws.send(request)) 

    def stop(self): 
     if self._listen_task: 
      self._loop.call_soon_threadsafe(self._listen_task.cancel) 

     if self._ws and self._ws.open: 
      self._ws.close() 

このクラスは、websocketsRxPYライブラリを使用しています。コミュニケーションを確立するとき、このクラスのインスタンスは、Webサービスからの応答を待ってRxPYサブジェクトに公開する、無限に実行されるタスクを作成します。

私はメインのCLIメソッドでCommunicationService.establish_communicationを実行します。

def cli(context, hostname, port): 
    log_level = context.meta['click_log.core.logger']['level'] 
    _initialize_logging(log_level) 

    # Create a new event loop for processing commands asynchronously on. 
    loop = asyncio.new_event_loop() 
    loop.set_debug(log_level == logging.DEBUG) 
    asyncio.set_event_loop(loop) 

    # Establish communication with TestCube Web Service. 
    context.comms = CommunicationService() 
    context.comms.establish_communication(hostname, port) 
    ... 

付属のCLIの引数に応じて、これは私がコルーチン機能として実装され、サブコマンドのコールバックを呼び出すことができます。 CLIを実行しているとき(

@cli.resultcallback() 
@click.pass_context 
def _handle_command_task(context, task: Coroutine, **_) -> None: 
    if task: 
     loop = asyncio.get_event_loop() 
     result = loop.run_until_complete(task) 
     context.comms.stop() 
     loop.close() 
     if result: 
      print(result, end='') 

私のプログラムは動作しますが、私は次のような出力が得られます。

私はその後Noneやコルーチン対象になりますどちらか呼び出されたサブコマンドの結果を処理するための関数を登録しますINFOログレベル):

$ testcube relays.0.enabled false 
2016-09-06 12:33:51,157 [INFO ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364 
2016-09-06 12:33:51,219 [ERROR ] asyncio - Task was destroyed but it is pending! 
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future pending cb=[Task._wakeup()]>> 
2016-09-06 12:33:51,219 [ERROR ] asyncio - Task was destroyed but it is pending! 
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]> 
2016-09-06 12:33:51,219 [ERROR ] asyncio - Task was destroyed but it is pending! 
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]> 
Exception ignored in: <generator object Queue.get at 0x03643600> 
Traceback (most recent call last): 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed 
RuntimeError: Event loop is closed 

私は)それをスケジュールとは対照的に、直接(レスポンス・リスニングタスクをキャンセルするCommunicationService.stop()を変更した場合は...

self._listen_task.cancel() 
#self._loop.call_soon_threadsafe(self._listen_task.cancel) 

私の代わりに、次の出力を得る:wait_forは(wait_for=<Future pending cb=[Task._wakeup()]>>とは対照的に)<Future cancelled>

... 
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled>> 
... 

ました。私はTask.cancel()とどのように電話するのか分かりません。それはfuture cancelledと言われていますが、まだタスクが保留中です。私はタスクで何か特別なことをする必要がありますか? try...except asyncio.CancelledException...にコードをラップしますか?

それがすべてで便利だ場合は、これは同じコマンドのDEBUGレベルの出力です:

$ testcube -v DEBUG relays.0.enabled false 
2016-09-06 12:48:10,145 [DEBUG ] asyncio - Using selector: SelectSelector 
2016-09-06 12:48:10,147 [DEBUG ] Rx - CurrentThreadScheduler.schedule(state=None) 
2016-09-06 12:48:10,147 [INFO ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364 
2016-09-06 12:48:10,153 [DEBUG ] asyncio - connect <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6> to ('127.0.0.1', 36364) 
2016-09-06 12:48:10,156 [DEBUG ] asyncio - poll took 0.000 ms: 1 events 
2016-09-06 12:48:10,163 [DEBUG ] asyncio - <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 56647), raddr=('127.0.0.1', 36364)> connected to 127.0.0.1:36364: (<_SelectorSocketTransport fd=608 read=polling write=<idle, bufsize=0>>, <websockets.client.WebSocketClientProtocol object at 0x03623BF0>) 
2016-09-06 12:48:10,198 [DEBUG ] asyncio - poll took 31.000 ms: 1 events 
2016-09-06 12:48:10,202 [DEBUG ] root - Connected using websocket address: ws://127.0.0.1:36364/ 
2016-09-06 12:48:10,202 [DEBUG ] Rx - CurrentThreadScheduler.schedule(state=None) 
2016-09-06 12:48:10,203 [DEBUG ] testcube.components.core - Using write handler 
2016-09-06 12:48:10,203 [DEBUG ] root - Listening for responses 
2016-09-06 12:48:10,205 [DEBUG ] testcube.comms - Sending request: {"op": "replace", "value": false, "path": "testcube.relays[0].enabled"} 
2016-09-06 12:48:10,208 [DEBUG ] websockets.protocol - client >> Frame(fin=True, opcode=1, data=b'{"op": "replace", "value": false, "path": "testcube.relays[0].enabled"}') 
2016-09-06 12:48:10,209 [DEBUG ] asyncio - Close <_WindowsSelectorEventLoop running=False closed=False debug=True> 
2016-09-06 12:48:10,222 [ERROR ] asyncio - Task was destroyed but it is pending! 
source_traceback: Object created at (most recent call last): 
    File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module> 
    load_entry_point('testcube', 'console_scripts', 'testcube')() 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main 
    cli(default_map=_get_default_settings()) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__ 
    return self.main(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main 
    rv = self.invoke(ctx) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke 
    Command.invoke(self, ctx) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke 
    return ctx.invoke(self.callback, **ctx.params) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke 
    return callback(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func 
    return f(get_current_context(), *args, **kwargs) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli 
    context.comms.establish_communication(hostname, port) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 48, in establish_communication 
    self._listen_task = asyncio.ensure_future(self._listen_for_responses()) 
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> created at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:48> 
2016-09-06 12:48:10,223 [ERROR ] asyncio - Task was destroyed but it is pending! 
source_traceback: Object created at (most recent call last): 
    File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module> 
    load_entry_point('testcube', 'console_scripts', 'testcube')() 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main 
    cli(default_map=_get_default_settings()) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__ 
    return self.main(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main 
    rv = self.invoke(ctx) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke 
    Command.invoke(self, ctx) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke 
    return ctx.invoke(self.callback, **ctx.params) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke 
    return callback(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func 
    return f(get_current_context(), *args, **kwargs) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli 
    context.comms.establish_communication(hostname, port) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 47, in establish_communication 
    self._ws = self._loop.run_until_complete(websockets.connect(websocket_address)) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete 
    self.run_forever() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever 
    self._run_once() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once 
    handle._run() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run 
    self._callback(*self._args) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\streams.py", line 238, in connection_made 
    self._stream_writer) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 633, in client_connected 
    self.worker_task = asyncio_ensure_future(self.run(), loop=self.loop) 
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:633> 
2016-09-06 12:48:10,223 [ERROR ] asyncio - Task was destroyed but it is pending! 
source_traceback: Object created at (most recent call last): 
    File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module> 
    load_entry_point('testcube', 'console_scripts', 'testcube')() 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main 
    cli(default_map=_get_default_settings()) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__ 
    return self.main(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main 
    rv = self.invoke(ctx) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1060, in invoke 
    return _process_result(sub_ctx.command.invoke(sub_ctx)) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1025, in _process_result 
    **ctx.params) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke 
    return callback(*args, **kwargs) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func 
    return f(get_current_context(), *args, **kwargs) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 190, in _handle_command_task 
    result = loop.run_until_complete(task) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete 
    self.run_forever() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever 
    self._run_once() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once 
    handle._run() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run 
    self._callback(*self._args) 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py", line 239, in _step 
    result = coro.send(None) 
    File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 34, in _listen_for_responses 
    message = await self._ws.recv() 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 280, in recv 
    self.messages.get(), loop=self.loop) 
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:280> 
Exception ignored in: <generator object Queue.get at 0x03641240> 
Traceback (most recent call last): 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon 
    File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed 
RuntimeError: Event loop is closed 

答えて

0

私はそれを考え出した - 私は、次のようCommunicationService.stop()を定義する必要があります。

def stop(self): 
    if self._listen_task is None or self._ws is None: 
     return 

    self._listen_task.cancel() 
    self._loop.run_until_complete(asyncio.wait([self._listen_task, self._ws.close()])) 

    self._listen_task = None 
    self._ws = None 

関連する問題で苦労する可能性がある他の人のためのドキュメントとして、完全なクリーンアップコードは次のようになりました:

@cli.resultcallback() 
@click.pass_context 
def _handle_command_task(context, task: Coroutine, **_) -> None: 
    if task: 
     loop = asyncio.get_event_loop() 
     result = loop.run_until_complete(task) 
     context.comms.stop() 
     loop.close() 
     if result: 
      print(result, end='') 
関連する問題