2016-11-03 19 views
3

asynciowebsocketsとPython 3.5.2を使用して、基本WebSocketクライアントを実装しようとしています。asyncioとwebsocketコールバックを非同期にする

基本的には、connect_to_dealerをブロッキングコールにしたいが、別のスレッドでwebsocketメッセージを待つ。

いくつかのドキュメントを読んだあと(私はPythonにはほとんど経験がありません)、私はasyncio.ensure_future()がコルーチン(listen_for_message)を渡すことが道だと結論づけました。

は今、私は別のスレッドでlisten_for_messageを実行するために取得するが、コルーチンの中から、私は通話同期を作るためにawaitまたは任意の他のメカニズムを使用するように見えることはできません。私がそれをすると、実行は単純な場合でも、永遠に(ハングする)待つ。sleep

私が間違っていることを知りたいのですが。

async def listen_for_message(self, future, websocket): 
    while (True): 
     try: 
      await asyncio.sleep(1) # It hangs here 
      print('Listening for a message...') 
      message = await websocket.recv() # If I remove the sleep, hangs here 
      print("< {}".format(message)) 
      future.set_result(message) 
      future.done() 
     except websockets.ConnectionClosed as cc: 
      print('Connection closed') 
     except Exception as e: 
      print('Something happened') 

def handle_connect_message(self, future): 
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg 
    print(future.result) 

async def connect_to_dealer(self): 
    print('connect to dealer') 
    websocket = await websockets.connect('wss://mywebsocket')) 
    hello_message = await websocket.recv() 
    print("< {}".format(hello_message)) 
    # We need to parse the connection ID out of the message 
    connection_id = hello_message['connectionId'] 
    print('Got connection id {}'.format(connection_id)) 
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers) 
    if sub_response.status_code == 200: 
     print('Now we\'re observing traffic') 
    else: 
     print('Oops request failed with {code}'.format(code=sub_response.status_code)) 
    # Now we need to handle messages but continue with the regular execution 
    try: 
     future = asyncio.get_event_loop().create_future() 
     future.add_done_callback(self.handle_connect_message) 
     asyncio.ensure_future(self.listen_for_message(future, websocket)) 
    except Exception as e: 
     print(e) 

答えて

2

明示的な先物を使用する特別な理由はありますか?

asyncioの場合、coroutinesTasksの組み合わせを使用してほとんどの目的を達成できます。タスクは基本的にラップされたコルーチンであり、バックグラウンドで他の非同期コードとは独立して機能するので、明示的にフローを管理したり、他のコードでそれらを混乱させる必要はありません。

私はあなたの最終目標の完全わからないが、おそらくアプローチは、以下の詳述はあなたで動作するように何かを与える:

import asyncio 

async def listen_for_message(): 

    while True: 

     await asyncio.sleep(0) 

     try: 
      print('Listening for a message...') 
      message = await websocket.recv() 

      print("< {}".format(message)) 

     except websockets.ConnectionClosed as cc: 
      print('Connection closed') 

     except Exception as e: 
      print('Something happened') 


async def connect_to_dealer(): 

    print('connect to dealer') 
    websocket = await websockets.connect('wss://mywebsocket') 

    hello_message = await websocket.recv() 
    print("< {}".format(hello_message)) 

    # We need to parse the connection ID out of the message 
    connection_id = hello_message['connectionId'] 
    print('Got connection id {}'.format(connection_id)) 

    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
     user_id='username', connection_id=connection_id), headers=headers) 

    if sub_response.status_code == 200: 
     print('Now we\'re observing traffic') 
    else: 
     print('Oops request failed with {code}'.format(code=sub_response.status_code)) 


async def my_app(): 

    # this will block until connect_to_dealer() returns 
    websocket = await connect_to_dealer() 

    # start listen_for_message() in its own task wrapper, so doing it continues in the background 
    asyncio.ensure_future(listen_for_message(websocket)) 

    # you can continue with other code here that can now coexist with listen_for_message() 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(my_app()) 
    loop.run_forever() 
+1

shongoloこんにちは、私は明示的な先物で作業する必要はありませんでしたが、私はそれを読みますドキュメントに書いて、それを使っただけです。 私はあなたの提案のおかげで私の問題を解決しました、私はキーが 'connect_to_dealer()'コルーチン内から 'asyncio.ensure_future(listen_for_message(websocket))'を呼び出さないことだと思います。 – mdelolmo

+2

それは喜んで解決策を引き起こしました。 'loop.run_until_complete()'を使って全てのコルーチンを動かす必要があり、必要ならば(例えばタスクを使って) 'loop.run_forever()'を使ってコード内のどこかで 'ensure_future()'を呼び出すことはOKです。 – shongololo

関連する問題