サブプロセスを使用して呼び出す実行可能ファイルがあります。次に、後で別のスレッドに取り込まれるQueueからその値を読み取るスレッドを使用して、stdin経由でいくつかのデータを供給するつもりです。出力は、別のスレッドのstdoutパイプを使用して読み取られ、キュー内でソートされる必要があります。python:スレッド内のサブプロセス出力を読み取る
私の以前の研究からわかるように、Queueでスレッドを使用するのは良い方法です。
外部実行可能ファイルは、残念ながら、パイプラインされているすべての行に対してすぐに答えられないので、単純な書き込み、readlineサイクルはオプションではありません。実行可能ファイルはいくつかの内部マルチスレッディングを実装しています。使用できるようになるとすぐに出力が必要なので、追加のリーダースレッドが必要になります。ただ、各ライン(shuffleline.py)をシャッフルします実行ファイルをテストするための一例として、
:
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
これはすでに、できるだけバッファリングされないことに注意してください。それとも?
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
この次の出力(python2.7)を得た:
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
、その後2秒間何も...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
インターリーブこれは私のストリップダウンテストプログラムであります当初は期待されています。ただし、の後にサブプロセスの出力はまで表示されず、サブプロセスは終了します。より多くの行がパイプされて出力が得られるので、stdoutパイプにキャッシュ問題があると想定します。ここに掲載されている他の質問によると、(少なくともサブプロセスで)標準出力をフラッシュすることは、少なくともLinuxではうまくいくはずです。あなたがない限り
ありがとう、それは解決策です! – muckl
サブプロセスとスレッドの混在がなぜそんなに恐ろしいアプローチなのか尋ねてもよろしいですか?何も起こっていない間に何度も何度もノンブロッキングI/Oを呼び出すよりもエレガントなようです。明らかに、スレッドはスレッドセーフではないデータ構造にアクセスすべきではありませんが、キューからの読み取りと書き込みは安全です。 Python3.2バックポートの変更は、私のような単純なケースにとって重要ですか? – muckl
スレッドとサブプロセスの問題は、スレッドとforkを混在させる問題です。 http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-themおよび他のそのような記事を参照してください。 Python 3.2サブプロセスバックポートは、これらの問題を回避します。スレッドの一般的な主な問題は、制御やデバッグが難しいことです。たとえば、スレッドの「外側」からそれらを削除することはできません。したがって、スレッドが読み取りまたは書き込みに固執している場合は、何もできません。 –