サービスを実行している(Twisted jsonrpc server)。 "run_procs"を呼び出すと、サービスは多数のオブジェクトを調べ、タイムスタンププロパティを調べて実行する必要があるかどうかを調べます。必要な場合は、thread_pool(リスト)に追加され、thread_pool内のすべての項目が呼び出されたstart()メソッドを取得します。Python:スレッドを使用してサブプロセスを呼び出す。複数回開く。
私はこのセットアップを、クラス内で関数を実行したいと思っていたいくつかの他のアプリケーションに使用しました。しかし、各スレッドが呼び出す関数でsubprocess.Popen呼び出しを使用している場合、呼び出しは同時に実行されるのではなく、一度に1つずつ実行されます。ここで
は、いくつかのサンプルコードです:
class ProcService(jsonrpc.JSONRPC):
self.thread_pool = []
self.running_threads = []
self.lock = threading.Lock()
def clean_pool(self, thread_pool, join=False):
for th in [x for x in thread_pool if not x.isAlive()]:
if join: th.join()
thread_pool.remove(th)
del th
return thread_pool
def run_threads(self, parallel=10):
while len(self.running_threads)+len(self.thread_pool) > 0:
self.clean_pool(self.running_threads, join=True)
n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))
if n > 0:
for th in self.thread_pool[0:n]: th.start()
self.running_threads.extend(self.thread_pool[0:n])
del self.thread_pool[0:n]
time.sleep(.01)
for th in self.running_threads+self.thread_pool: th.join()
def jsonrpc_run_procs(self):
for i, item in enumerate(self.items):
if item.should_run():
self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))
self.run_threads(5)
def run_proc(self, proc):
self.lock.acquire()
print "\nSubprocess started"
p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)
stdout_value = proc.communicate('through stdin to stdout')[0]
self.lock.release()
すべてのヘルプ/提案が高く評価されています。
* EDIT * OK。だから今私はstdoutパイプからの出力を読み返したい。これはいくつかの時に動作しますが、select.errorでも失敗します:(4、 'Interrupted system call')私はこれが、通信メソッドを実行しようとする前に、 run_procメソッドのコードに変更されました:
DEF run_proc(自己、PROC): self.lock.acquire() P = subprocess.Popen( self.running_procs.append #etc([P、 proc.data.id]) self.lock.release()私はself.run_threadsは(5)私はself.check_procs()
check_procs方法を反復ポーリングをチェックするrunning_procsのリストを呼び出して呼び出した後
()がNoneではありません。パイプから出力を得るにはどうすればいいですか?私は次の両方を試しました
calling check_procs once:
def check_procs(self):
for proc_details in self.running_procs:
proc = proc_details[0]
while (proc.poll() == None):
time.sleep(0.1)
stdout_value = proc.communicate('through stdin to stdout')[0]
self.running_procs.remove(proc_details)
print proc_details[1], stdout_value
del proc_details
calling check_procs in while loop like:
while len(self.running_procs) > 0:
self.check_procs()
def check_procs(self):
for proc_details in self.running_procs:
if (proc.poll() is not None):
stdout_value = proc.communicate('through stdin to stdout')[0]
self.running_procs.remove(proc_details)
print proc_details[1], stdout_value
del proc_details
私は現在キューを探しています。ありがとう。 – sberry
キューを使用してサブプロセスを処理する簡単な例を教えてください。私は今までに何の成功もしていません... – sberry
OK、編集内容で... –