2017-09-25 3 views
1

私は、PyQt5を使用するクライアントとasyncioの周りに構築されたwebsocketsモジュールで作業してきました。私は以下のコードのようなものが動作すると思ったが、私はラインの編集ボックスに入力をクリックするまで、(サーバーからの)着信データがGUIで更新されていないことがわかった。これらの着信メッセージは、GUIへの更新のためのパルスを設定するためのものであり、更新に使用されるデータを運ぶ。クマシはこれに接近するより良い方法ですか? btw、私はこのコードのいくつかの他の側面のためのプロセスを使用するので、私はそれを(この時点で)過度に考慮しないでください。 これは、Python 3.6、PyQt5.6(以上)、およびpipで現在インストールされているWebソケットのバージョンです。 https://github.com/aaugustin/websockets最後にwebsockets、asyncio、PyQt5を一緒に使用します。クマシュは必要ですか?

クライアント:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 
import asyncio 
import websockets 
import sys 
import time 
from multiprocessing import Process, Pipe, Queue 
from PyQt5 import QtCore, QtGui, QtWidgets 

class ComBox(QtWidgets.QDialog): 
    def __init__(self): 
     QtWidgets.QDialog.__init__(self) 
     self.verticalLayout = QtWidgets.QVBoxLayout(self) 
     self.groupBox = QtWidgets.QGroupBox(self) 
     self.groupBox.setTitle("messages from beyond") 
     self.gridLayout = QtWidgets.QGridLayout(self.groupBox) 
     self.label = QtWidgets.QLabel(self.groupBox) 
     self.gridLayout.addWidget(self.label, 0, 0, 1, 1) 
     self.verticalLayout.addWidget(self.groupBox) 
     self.lineEdit = QtWidgets.QLineEdit(self) 
     self.verticalLayout.addWidget(self.lineEdit) 
     self.lineEdit.editingFinished.connect(self.enterPress) 

    @QtCore.pyqtSlot() 
    def enterPress(self): 
     mytext = str(self.lineEdit.text()) 
     self.inputqueue.put(mytext) 

    @QtCore.pyqtSlot(str) 
    def updategui(self, message): 
     self.label.setText(message) 

class Websocky(QtCore.QThread): 
    updatemaingui = QtCore.pyqtSignal(str) 
    def __init__(self): 
     super(Websocky, self).__init__() 
    def run(self): 
     while True: 
      time.sleep(.1) 
      message = self.outputqueue.get() 
      try: 
       self.updatemaingui[str].emit(message) 
      except Exception as e1: 
       print("updatemaingui problem: {}".format(e1)) 

async def consumer_handler(websocket): 
    while True: 
     try: 
      message = await websocket.recv() 
      outputqueue.put(message) 
     except Exception as e1: 
      print(e1) 

async def producer_handler(websocket): 
    while True: 
     message = inputqueue.get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

async def handler(): 
    async with websockets.connect('ws://localhost:8765') as websocket: 
     consumer_task = asyncio.ensure_future(consumer_handler(websocket)) 
     producer_task = asyncio.ensure_future(producer_handler(websocket)) 
     done, pending = await asyncio.wait(
      [consumer_task, producer_task], 
      return_when=asyncio.FIRST_COMPLETED,) 
     for task in pending: 
      task.cancel() 

def start_websockets(): 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(handler()) 

inputqueue = Queue() 
outputqueue = Queue() 

app = QtWidgets.QApplication(sys.argv) 
comboxDialog = ComBox() 
comboxDialog.inputqueue = inputqueue 
comboxDialog.outputqueue = outputqueue 
comboxDialog.show() 

webster = Websocky() 
webster.outputqueue = outputqueue 
webster.updatemaingui[str].connect(comboxDialog.updategui) 
webster.start() 

p2 = Process(target=start_websockets) 
p2.start() 

sys.exit(app.exec_()) 

サーバー:

#!/usr/bin/env python3 
# -*- coding: utf-8 -*- 
import asyncio 
import time 
import websockets 

# here we'll store all active connections to use for sending periodic messages 
connections = [] 


#@asyncio.coroutine 
async def connection_handler(connection, path): 
    connections.append(connection) # add connection to pool 
    while True: 
     msg = await connection.recv() 
     if msg is None: # connection lost 
      connections.remove(connection) # remove connection from pool, when client disconnects 
      break 
     else: 
      print('< {}'.format(msg)) 

#@asyncio.coroutine 
async def send_periodically(): 
    while True: 
     await asyncio.sleep(2) # switch to other code and continue execution in 5 seconds 
     for connection in connections: 
      message = str(round(time.time())) 
      print('> Periodic event happened.') 
      await connection.send(message) # send message to each connected client 

start_server = websockets.serve(connection_handler, 'localhost', 8765) 
asyncio.get_event_loop().run_until_complete(start_server) 
asyncio.ensure_future(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages 
asyncio.get_event_loop().run_forever() 

答えて

0

まもなく、私は問題を認識し、この質問を投稿した後。 producer_handler関数内の行がブロックされています。これは、キューに何かが見えるまで、そのプロセスのすべてをハングする非同期関数でなければなりません。私の回避策は、asyncio互換のキューを提供するaioprocessingモジュールを使用することでした。したがって、これは次のようになります。

import aioprocessing 

async def producer_handler(websocket): 
    while True: 
     message = await inputqueue.coro_get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

inputqueue = aioprocessing.AioQueue() 

補助処理モジュールは、いくつかの素晴らしいオプションとドキュメントを提供します。そして、この場合は、問題のためのやや単純な解決策です。 https://github.com/dano/aioprocessing 私の質問に答えるには:いいえ、あなたはこの種のものにクマシを使う必要はありません。

関連する問題