私はそれはそうasyncioワークフロー...PythonのWebSocketをは、asyncio、キュー - 生産者と消費者のメソッドを持つカスタムサーバークラスとハンドラクラス
EDITを理解する問題を抱えて - asyncio.Queueを組み込むためにコードを変更:
#!/usr/bin/env python
import asyncio
import websockets
import threading
class WSServer:
def serve_forever(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
async def handler(self, websocket, path):
loop = asyncio.get_event_loop()
master = MyClass(websocket)
while True:
listener_task = asyncio.ensure_future(master.get_message())
producer_task = asyncio.ensure_future(master.produce())
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED)
if listener_task in done:
await master.consume()
else:
listener_task.cancel()
if producer_task in done:
msg_to_send = producer_task.result()
await master.send_message(msg_to_send)
else:
producer_task.cancel()
class MyClass:
incoming = asyncio.Queue()
outgoing = asyncio.Queue()
def __init__(self, websocket):
self.ws = websocket
async def get_message(self):
msg_in = await self.ws.recv()
await self.incoming.put(msg_in)
async def send_message(self, message):
await self.ws.send(message)
async def consume(self):
msg_to_consume = await self.incoming.get()
# do something 'consuming' :)
consume_output = msg_to_consume
await self.outgoing.put(consume_output)
async def produce(self):
msg_out = await self.outgoing.get()
return msg_out
if __name__ == '__main__':
s = WSServer()
t = threading.Thread(target=s.serve_forever)
t.daemon = True
t.start()
while True:
asyncio.sleep(5)
MyClass.consumeを変更する()奇妙な行動を(別のものではない笑で、一台のマシン上で)働いている:
async def consume(self):
msg_to_consume = await self.incoming.get()
# do something 'consuming' :)
consume_output = msg_to_consume
await self.outgoing.put('THIS WILL NOT GET INTO QUEUE???!!!')
print('Outgoing empty 1: ' + str(self.outgoing.empty()))
# And this will get into queue O.o
await self.outgoing.put(consume_output)
print('Outgoing empty 2: ' + str(self.outgoing.empty()))
私が原因でCA後2つの待ち受けを持っていますself.outgoing.put()
初めて、self.outgoingキューはまだ空です!私は再びそれを呼び出すときにのみ、アイテムを受け取っているようです...任意のアイデア?
その他機械はただのエラーがスローされます。
Exception in connection handler
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/websockets/server.py", line 78, in handler
yield from self.ws_handler(self, path)
File "test2.py", line 33, in handler
msg_to_send = producer_task.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "test2.py", line 66, in produce
msg_out = await self.outgoing.get()
File "/usr/lib/python3.5/asyncio/queues.py", line 168, in get
yield from getter
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<MyClass.produce() running at test2.py:66> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:414]> got Future <Future pending> attached to a different loop
ORIGINAL:
私は私がしようとしています何:)
#!/usr/bin/env python
import asyncio
import websockets
import threading
class WSServer:
def serve_forever(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
async def handler(self, websocket, path):
loop = asyncio.get_event_loop()
master = MyClass(websocket)
while True:
listener_task = asyncio.ensure_future(master.get_message())
producer_task = asyncio.ensure_future(master.produce())
done, pending = await asyncio.wait(
[listener_task, producer_task],
return_when=asyncio.FIRST_COMPLETED)
if listener_task in done:
await master.consume()
else:
listener_task.cancel()
if producer_task in done:
if producer_task.result():
await master.send_message()
else:
producer_task.cancel()
class MyClass:
incoming = []
outgoing = []
def __init__(self, websocket):
self.ws = websocket
async def get_message(self):
self.incoming.append(self.ws.recv())
async def send_message(self):
self.ws.send(self.outgoing.pop(0))
async def consume(self):
self.outgoing.append(self.incoming.pop(0))
async def produce(self):
if self.outgoing:
return True
if __name__ == '__main__':
s = WSServer()
t = threading.Thread(target=s.serve_forever)
t.daemon = True
t.start()
while True:
asyncio.sleep(5)
を意図したように、明らかに機能していない、このコードを、持っています達成:
メインスレッドとは別のスレッドでWSServerインスタンスを実行している(WSServer.serve_foreverで正常に動作している)
WSServer.handlerメソッドで接続されているクライアントごとに、受信メッセージ用と、出て行く。
着信がMyClass.get_message(から充填されなければならない) - 基本的websocket.recv()
送信がMyClass.consume()から充填することができる - 応答としてだけでなく、外部から充填することができますこのコードの範囲。それMyClass.send_message()
経由Myclass.outgoingで何か、プロセスがあるMyClass.consume()、経由MyClass.incomingで何か、それを処理がある場合
私はないですMyClass.produce()については、実際には何も生成する必要はないので、いくつかのメッセージがあるときには送信メッセージを送信してください。
Need help on producer and consumer thread in python
asyncio queue consumer coroutine
:私はここに同様のスレッドを見つけましたが、その例と問題は正直に私の理解の外にある)も
をasycnio.Queueを(使用して、いくつかのコードを見てきましたここで正しいアプローチは何でしょうか?
websocketのコルーチンを待つ必要があります: 'await self.ws.recv()'/'await self.ws.send([...])'。 [この例](https://github.com/aaugustin/websockets/blob/master/example/server.py)を参照してください。 – Vincent
私は標準的なWebソケットの例を見てきましたが、それほど変化はありませんでした。それを呼び出すrecv/sendのパートまたは親の関数を待っている場合、それは同じではありませんか?ああ、あなたは非同期関数の中で待つことはできません。 –
コルーチンを呼び出すと実行されません。そのためには 'await'を使わなければなりません。この理由から、 'True:asyncio.sleep(5)'のコードの次の部分は、実際にはスリープしませんが、代わりにそれらを使用せずに 'sleep'コルーチンをたくさん作成します。 – Vincent