2017-04-17 5 views
0

subprocess.Popen()を使ってCプログラムを実行し、そのストリームをリアルタイムで出力してクライアントに送信します。ただし、出力はバッファリングされ、実行の最後に一緒に送信されます(ブロッキングの性質)。リアルタイムで出力を受け取って、Twisted Autobahnで即時に送信するにはどうすればいいですか?Twisted [autobahn] websocketサーバーでリアルタイムに出力をストリームする方法は?

def onConnect(self, request): 
    try: 
     self.cont_name = ''.join(random.choice(string.lowercase) for i in range(5)) 
     self.file_name = self.cont_name 
     print("Connecting...") 
    except Exception: 
     print("Failed"+str(Exception))  

def onOpen(self): 
    try: 
     print("open") 
    except Exception: 
     print("Couldn't create container") 

def onMessage(self, payload,isBinary=False): 
     cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
     a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1) 
     for line in iter(a.stdout.readline, b''): 
      line = line.encode('utf8') 
      self.sendMessage(line) 

def onClose(self, wasClean, code, reason): 
    try: 
     print("Closed container...") 
    except Exception: 
     print(str(Exception))  

サブプロセスを使用してdockerコマンドを実行すると、cコードの出力全体が発生するのではなく、すぐに返されます。例:

これをコンテナで実行した後、プログラムはクライアントに3秒後に 'Rounded'を返します。ただし、実行終了時にすべての丸められた送信を終了します。

+0

あなたの質問は、あなたが持っている問題がどこにあるのか、それを過ぎるのに役立つだろうかということは、広くは分かりませんが、いくつかのサンプルコードを含めることができますか?好ましくは、http://sscce.org/の行に沿って何かがあります。 –

+0

@ Jean-PaulCalderone私はいくつかのコードを追加しました。それを見てください。ありがとう –

答えて

0

不正行為は、この方法でループから来る:

def onMessage(self, payload,isBinary=False): 
     cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
     a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1) 
     for line in iter(a.stdout.readline, b''): 
      line = line.encode('utf8') 
      self.sendMessage(line) 

ツイストは、協調マルチタスクシステムです。デフォルトでは、全てが単一のスレッド( "reactor thread")で実行されます。つまり、すべてのコードが周期的に(通常はすばやく)制御を放棄して、他のコード(アプリケーションコードまたはツイストされた実装コード)が実行されるようにする必要があります。この関数のループは、子プロセスから読み込んで、Autobahn APIを使用してデータを繰り返し送信します。何度も制御を放棄しません。

Popenオブジェクトからの読み取りをブロックすると、問題が発生することもあります。読み込みがどれだけ長くブロックされるかはわかりません。したがって、リアクタースレッドで他のコードが実行されないようにどれくらいの時間がかかるか分かりません。 、

def onMessage(self, payload,isBinary=False): 
    cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
    popen_in_thread(
     lambda line: reactor.callFromThread(
      lambda: self.sendMessage(line.encode("utf-8")) 
     ), 
     [cmd], shell=True, stdout=subprocess.PIPE, bufsize=1 
    ) 

def popen_in_thread(callback, *args, **kwargs): 
    def threaded(): 
     a = subprocess.Popen(*args, **kwargs) 
     for line in iter(a.stdout.readline, b''): 
      callback(line) 
    reactor.callInThread(threaded) 

それとも、より良いツイスト独自のプロセス・サポートを使用します:あなたは、あなたのpopenのは、彼らが原子炉のスレッドをブロックすることはありません新しいスレッドに読み込み、移動することができますいずれか

def onMessage(self, payload,isBinary=False): 
    class ProcessLinesToMessages(ProcessProtocol): 
     def outReceived(self, output): 
      buf = self.buf + output 
      lines = buf.splitlines() 
      self.buf = lines.pop() 
      for line in lines: 
       self.sendMessage(line.encode("utf-8")) 
      while True: 
       line, self.buf = self.buf.splitline 
    reactor.spawnProcess(
     ProcessLinesToMessages(), 
     "docker", 
     [ 
      "docker", 
      "exec", 
      self.cont_name, 
      "/tmp/./ + self.file_name, 
     ], 
    ) 

は(どちらのバージョンでテストされ、うまくいけば、そのアイデアは明らかです)

+0

はこの場合に使用できますか? –

+0

Deferredは、将来の単一の結果を表します。進行中のプロセスからデータをストリーミングする必要があります。これらの2つの事はよくうまくいっていません。しかし、このサンプルコードで行ったように、コールバックを手作業で作る代わりにというチューブを使用することもできます。 –

関連する問題