2017-02-18 12 views
0

ので、同様に、私は、stdoutとstderrをunbuffersクラスの機能を持っていますのWebSocketからの着信時に峠非同期websocket.send()

class Unbuffered: 
    def __init__(self, stream): 
     self.stream = stream 
    def write(self, data): 
     data = data.strip() 
     if data.startswith("INFO: "): 
      data = data[6:] 
     if '[' in data: 
      progress = re.compile(r"\[(\d+)/(\d+)\]") 
      data = progress.match(data) 
      total = data.group(2) 
      current = data.group(1) 
      data = '{0}/{1}'.format(current, total) 
     if data.startswith("ERROR: "): 
      data = data[7:] 
     self.stream.write(data + '\n') 
     self.stream.flush() 
    def __getattr__(self, attr): 
     return getattr(self.stream, attr) 

出力はProcessPoolExecutorの関数の実行からです到着。

出力をコンソールに出力し、私のwebsocketクライアントに送信します。私はバッファリングされていないasyncingを試み、それにwebsocketインスタンスを渡しましたが運はありません。


UPDATE:このようなrun()の必需品と私のWebSocket handler()外観は何か:

def run(url, path): 
    logging.addLevelName(25, "INFO") 
    fmt = logging.Formatter('%(levelname)s: %(message)s') 
    #---- 
    output.progress_stream = Unbuffered(sys.stderr) 
    stream = Unbuffered(sys.stdout) 
    #---- 
    level = logging.INFO 
    hdlr = logging.StreamHandler(stream) 
    hdlr.setFormatter(fmt) 
    log.addHandler(hdlr) 
    log.setLevel(level) 
    get_media(url, opt) 

async def handler(websocket, path): 
    while True: 
     inbound = json.loads(await websocket.recv()) 
     if inbound is None: 
      break 
     url = inbound['url'] 
     if 'path' in inbound: 
      path = inbound['path'].rstrip(os.path.sep) + os.path.sep 
     else: 
      path = os.path.expanduser("~") + os.path.sep 
     # blah more code 
     while inbound != None: 
      await asyncio.sleep(.001) 
      await loop.run_in_executor(None, run, url, path) 

run()handler()Unbufferedは互いに分離されています。

+0

いくつかのコメント: 'data.start()'の代わりに 'data = data.strip()'とし、後で 'data.startswith(" INFO: "):data = data [6:]'を試してください。 're.split()'の代わりに 're.match(r" \ [([^ \]])\] ")'を使ってみてください。 – Udi

+0

もちろん、もちろんです!ありがとう、私はこれを編集します。 – fivethous

+0

'run()'は 'Unbuffered'インスタンスを作成しますか? 'run()'のrelavant部分をペーストし、 'Unbuffered'がどのようにどこで使われているのかを表示できますか? – Udi

答えて

0

get_media()を別のスレッドで実行する代わりにasyncioを使用して書き換えることが最適です。それ以外の場合は、例えば、通常のスレッドとコルーチンとの間で通信するためのいくつかのオプションは、ソケットペアを使用して、あります

import asyncio 
import socket 
import threading 
import time 

import random 


# threads stuff 
def producer(n, writer): 
    for i in range(10): 
     # print("sending", i) 
     writer.send("message #{}.{}\n".format(n, i).encode()) 
     time.sleep(random.uniform(0.1, 1)) 


def go(writer): 
    threads = [threading.Thread(target=producer, args=(i + 1, writer,)) 
       for i in range(5)] 
    for t in threads: 
     t.start() 
    for t in threads: 
     t.join() 
    writer.send("bye\n".encode()) 


# asyncio coroutines 
async def clock(): 
    for i in range(11): 
     print("The time is", i) 
     await asyncio.sleep(1) 


async def main(reader): 
    buffer = "" 
    while True: 
     buffer += (await loop.sock_recv(reader, 10000)).decode() 
     # print(len(buffer)) 
     while "\n" in buffer: 
      msg, _nl, buffer = buffer.partition("\n") 
      print("Got", msg) 
      if msg == "bye": 
       return 


reader, writer = socket.socketpair() 
reader.setblocking(False) 
threading.Thread(target=go, args=(writer,)).start() 
# time.sleep(1.5) # socket is buffering 
loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait([clock(), main(reader)])) 
loop.close() 

ます。また、このサードパーティのスレッド+ asyncio互換のキューを試すことができます:janus