2016-11-21 2 views
1

私はそれはそうasyncioワークフロー...PythonのWebSocketをは、asyncio、キュー - 生産者と消費者のメソッドを持つカスタムサーバークラスとハンドラクラス

EDITを理解する問題を抱えて - asyncio.Queueを組み込むためにコードを変更:

#!/usr/bin/env python 

import asyncio 
import websockets 
import threading 


class WSServer: 

    def serve_forever(self): 
     loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(loop) 
     start_server = websockets.serve(self.handler, '127.0.0.1', 5678) 
     asyncio.get_event_loop().run_until_complete(start_server) 
     asyncio.get_event_loop().run_forever() 

    async def handler(self, websocket, path): 
     loop = asyncio.get_event_loop() 
     master = MyClass(websocket) 
     while True: 
      listener_task = asyncio.ensure_future(master.get_message()) 
      producer_task = asyncio.ensure_future(master.produce()) 
      done, pending = await asyncio.wait(
       [listener_task, producer_task], 
       return_when=asyncio.FIRST_COMPLETED) 

      if listener_task in done: 
       await master.consume() 
      else: 
       listener_task.cancel() 

      if producer_task in done: 
       msg_to_send = producer_task.result() 
       await master.send_message(msg_to_send) 
      else: 
       producer_task.cancel() 


class MyClass: 

    incoming = asyncio.Queue() 
    outgoing = asyncio.Queue() 

    def __init__(self, websocket): 
     self.ws = websocket 

    async def get_message(self): 
     msg_in = await self.ws.recv() 
     await self.incoming.put(msg_in) 

    async def send_message(self, message): 
     await self.ws.send(message) 

    async def consume(self): 
     msg_to_consume = await self.incoming.get() 
     # do something 'consuming' :) 
     consume_output = msg_to_consume 
     await self.outgoing.put(consume_output) 

    async def produce(self): 
     msg_out = await self.outgoing.get() 
     return msg_out 


if __name__ == '__main__': 
    s = WSServer() 
    t = threading.Thread(target=s.serve_forever) 
    t.daemon = True 
    t.start() 
    while True: 
     asyncio.sleep(5) 

MyClass.consumeを変更する()奇妙な行動を(別のものではない笑で、一台のマシン上で)働いている:

async def consume(self): 
    msg_to_consume = await self.incoming.get() 
    # do something 'consuming' :) 
    consume_output = msg_to_consume 
    await self.outgoing.put('THIS WILL NOT GET INTO QUEUE???!!!') 
    print('Outgoing empty 1: ' + str(self.outgoing.empty())) 
    # And this will get into queue O.o 
    await self.outgoing.put(consume_output) 
    print('Outgoing empty 2: ' + str(self.outgoing.empty())) 

私が原因でCA後2つの待ち受けを持っていますself.outgoing.put()初めて、self.outgoingキューはまだ空です!私は再びそれを呼び出すときにのみ、アイテムを受け取っているようです...任意のアイデア?

その他機械はただのエラーがスローされます。

Exception in connection handler 
Traceback (most recent call last): 
    File "/usr/lib/python3/dist-packages/websockets/server.py", line 78, in handler 
    yield from self.ws_handler(self, path) 
    File "test2.py", line 33, in handler 
    msg_to_send = producer_task.result() 
    File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result 
    raise self._exception 
    File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step 
    result = coro.throw(exc) 
    File "test2.py", line 66, in produce 
    msg_out = await self.outgoing.get() 
    File "/usr/lib/python3.5/asyncio/queues.py", line 168, in get 
    yield from getter 
    File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__ 
    yield self # This tells Task to wait for completion. 
RuntimeError: Task <Task pending coro=<MyClass.produce() running at test2.py:66> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:414]> got Future <Future pending> attached to a different loop 

ORIGINAL:

私は私がしようとしています何:)

#!/usr/bin/env python 

import asyncio 
import websockets 
import threading 


class WSServer: 

    def serve_forever(self): 
     loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(loop) 
     start_server = websockets.serve(self.handler, '127.0.0.1', 5678) 
     asyncio.get_event_loop().run_until_complete(start_server) 
     asyncio.get_event_loop().run_forever() 

    async def handler(self, websocket, path): 
     loop = asyncio.get_event_loop() 
     master = MyClass(websocket) 
     while True: 
      listener_task = asyncio.ensure_future(master.get_message()) 
      producer_task = asyncio.ensure_future(master.produce()) 
      done, pending = await asyncio.wait(
       [listener_task, producer_task], 
       return_when=asyncio.FIRST_COMPLETED) 

      if listener_task in done: 
       await master.consume() 
      else: 
       listener_task.cancel() 

      if producer_task in done: 
       if producer_task.result(): 
        await master.send_message() 
      else: 
       producer_task.cancel() 


class MyClass: 

    incoming = [] 
    outgoing = [] 

    def __init__(self, websocket): 
     self.ws = websocket 

    async def get_message(self): 
     self.incoming.append(self.ws.recv()) 

    async def send_message(self): 
     self.ws.send(self.outgoing.pop(0)) 

    async def consume(self): 
     self.outgoing.append(self.incoming.pop(0)) 

    async def produce(self): 
     if self.outgoing: 
      return True 


if __name__ == '__main__': 
    s = WSServer() 
    t = threading.Thread(target=s.serve_forever) 
    t.daemon = True 
    t.start() 
    while True: 
     asyncio.sleep(5) 

を意図したように、明らかに機能していない、このコードを、持っています達成:

  1. メインスレッドとは別のスレッドでWSServerインスタンスを実行している(WSServer.serve_foreverで正常に動作している)

  2. WSServer.handlerメソッドで接続されているクライアントごとに、受信メッセージ用と、出て行く。

  3. 着信がMyClass.get_message(から充填されなければならない) - 基本的websocket.recv()

  4. 送信がMyClass.consume()から充填することができる - 応答としてだけでなく、外部から充填することができますこのコードの範囲。それMyClass.send_message()

経由Myclass.outgoingで何か、プロセスがあるMyClass.consume()、経由MyClass.incomingで何か、それを処理がある場合

  • 私はないですMyClass.produce()については、実際には何も生成する必要はないので、いくつかのメッセージがあるときには送信メッセージを送信してください。

    Need help on producer and consumer thread in python

    asyncio queue consumer coroutine

    :私はここに同様のスレッドを見つけましたが、その例と問題は正直に私の理解の外にある)も

    をasycnio.Queueを(使用して、いくつかのコードを見てきましたここで正しいアプローチは何でしょうか?

  • +0

    websocketのコルーチンを待つ必要があります: 'await self.ws.recv()'/'await self.ws.send([...])'。 [この例](https://github.com/aaugustin/websockets/blob/master/example/server.py)を参照してください。 – Vincent

    +0

    私は標準的なWebソケットの例を見てきましたが、それほど変化はありませんでした。それを呼び出すrecv/sendのパートまたは親の関数を待っている場合、それは同じではありませんか?ああ、あなたは非同期関数の中で待つことはできません。 –

    +0

    コルーチンを呼び出すと実行されません。そのためには 'await'を使わなければなりません。この理由から、 'True:asyncio.sleep(5)'のコードの次の部分は、実際にはスリープしませんが、代わりにそれらを使用せずに 'sleep'コルーチンをたくさん作成します。 – Vincent

    答えて

    0

    pythonのチャットルームの助けを借りて答えを見つけました。

    class MyClass: 
    
        def __init__(self, websocket): 
         self.ws = websocket 
         self.incoming = asyncio.Queue() 
         self.outgoing = asyncio.Queue() 
    

    キューは、クラス自体ではなく、クラスのインスタンスに対して定義する必要があります。

    関連する問題