2009-08-10 12 views
-1

サービスを実行している(Twisted jsonrpc server)。 "run_procs"を呼び出すと、サービスは多数のオブジェクトを調べ、タイムスタンププロパティを調べて実行する必要があるかどうかを調べます。必要な場合は、thread_pool(リスト)に追加され、thread_pool内のすべての項目が呼び出されたstart()メソッドを取得します。Python:スレッドを使用してサブプロセスを呼び出す。複数回開く。

私はこのセットアップを、クラス内で関数を実行したいと思っていたいくつかの他のアプリケーションに使用しました。しかし、各スレッドが呼び出す関数でsubprocess.Popen呼び出しを使用している場合、呼び出しは同時に実行されるのではなく、一度に1つずつ実行されます。ここで

は、いくつかのサンプルコードです:

class ProcService(jsonrpc.JSONRPC): 
     self.thread_pool = [] 
     self.running_threads = [] 
     self.lock = threading.Lock() 

     def clean_pool(self, thread_pool, join=False): 
       for th in [x for x in thread_pool if not x.isAlive()]: 
         if join: th.join() 
         thread_pool.remove(th) 
         del th 
       return thread_pool 

     def run_threads(self, parallel=10): 
       while len(self.running_threads)+len(self.thread_pool) > 0: 
         self.clean_pool(self.running_threads, join=True) 
         n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool)) 
         if n > 0: 
           for th in self.thread_pool[0:n]: th.start() 
           self.running_threads.extend(self.thread_pool[0:n]) 
           del self.thread_pool[0:n] 
         time.sleep(.01) 
       for th in self.running_threads+self.thread_pool: th.join() 

     def jsonrpc_run_procs(self): 
       for i, item in enumerate(self.items): 
         if item.should_run(): 
           self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item]))) 
       self.run_threads(5) 

     def run_proc(self, proc): 
       self.lock.acquire() 
       print "\nSubprocess started" 
       p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,) 
       stdout_value = proc.communicate('through stdin to stdout')[0] 
       self.lock.release() 

すべてのヘルプ/提案が高く評価されています。

* EDIT * OK。だから今私はstdoutパイプからの出力を読み返したい。これはいくつかの時に動作しますが、select.errorでも失敗します:(4、 'Interrupted system call')私はこれが、通信メソッドを実行しようとする前に、 run_procメソッドのコードに変更されました:

DEF run_proc(自己、PROC): self.lock.acquire() P = subprocess.Popen( self.running_procs.append #etc([P、 proc.data.id]) self.lock.release()私はself.run_threadsは(5)私はself.check_procs()

check_procs方法を反復ポーリングをチェックするrunning_procsのリストを呼び出して呼び出した後

()がNoneではありません。パイプから出力を得るにはどうすればいいですか?私は次の両方を試しました

calling check_procs once: 

def check_procs(self): 
    for proc_details in self.running_procs: 
     proc = proc_details[0] 
     while (proc.poll() == None): 
      time.sleep(0.1) 
     stdout_value = proc.communicate('through stdin to stdout')[0] 
     self.running_procs.remove(proc_details) 
     print proc_details[1], stdout_value 
     del proc_details 

calling check_procs in while loop like: 

while len(self.running_procs) > 0: 
    self.check_procs() 

def check_procs(self): 
    for proc_details in self.running_procs: 
     if (proc.poll() is not None): 
      stdout_value = proc.communicate('through stdin to stdout')[0] 
      self.running_procs.remove(proc_details) 
      print proc_details[1], stdout_value 
      del proc_details 

答えて

1

私は、キーコードがあると思う:

self.lock.acquire() 
    print "\nSubprocess started" 
    p = subprocess.Popen(# etc 
    stdout_value = proc.communicate('through stdin to stdout')[0] 
    self.lock.release() 

取得し、解放するための明示的な呼び出しがシリアライズを保証する必要があります - あなただけのように常にあなたが他の事をすればシリアライズを守りませんサブプロセスの代わりにこのブロックで使用しますか?

編集:ここでは、すべての沈黙ので、私は、ロックを削除し、代わりにQueue.Queue()インスタンス上の各stdout_valueを置くために提案を追加します - キューはintrinsicalyスレッドセーフ(独自のロックを扱う)であるので、あなたができるget(またはget_nowaitなど)は、準備が整い次第putになります。一般的に、Queueは、Pythonでスレッド通信(そしてしばしば同期も)を整える最良の方法です。

具体的には、最初にimport Queueを追加します。作って、買収してから解放してください。self.lock(その3行だけを削除してください)。 self.q = Queue.Queue()__init__に追加してください。コールの直後にstdout_value = proc.communicate(...文を追加self.q.put(stdout_value);今e。グラムは、すべての結果があることを確認するために

while not self.q.empty(): 
    result = self.q.get() 
    print 'One result is %r' % result 

jsonrpc_run_procs方法を終えます。 (通常、emptyのキューのメソッドは信頼できませんが、この場合、キューに入れるすべてのスレッドはすでに終了していますので、問題はありません)。

+0

私は現在キューを探しています。ありがとう。 – sberry

+0

キューを使用してサブプロセスを処理する簡単な例を教えてください。私は今までに何の成功もしていません... – sberry

+0

OK、編集内容で... –

1

特定の問題はおそらく、stdout_value = proc.communicate('through stdin to stdout')[0]という行によって発生している可能性があります。 Subprocess.communicateは"Wait for process to terminate"になります。これはロックとともに使用すると一度に1つずつ実行されます。

可能なことは、p変数をリストに追加して実行し、Subprocess APIを使用してサブプロセスが完了するのを待つだけです。メインスレッドの各サブプロセスを定期的にポーリングします。

第2の見通しでは、この行にも問題があるようです:for th in self.running_threads+self.thread_pool: th.join()。 Thread.join()は、スレッドが終了するのを待つ別のメソッドです。

+0

pをself.running_procsに追加するように変更しました。これにより、スレッドが必要なときに同時に実行できます。 self.running_procs内のp(procs)ごとにpoll()応答を定期的にチェックする最良の方法は何ですか? – sberry

+0

(len(self.running_procs)> 0): self.check_procs() そしてそれからcheck_procsで私はpoll()からの戻りをチェックしてプロセスが終了したかどうかを調べます。これは良い方法ですか? – sberry

+0

はい。多かれ少なかれ、次のようなものです: 'while thread.poll()==なし:time.sleep(x) self.running_procs.remove(thread)' – ACoolie

関連する問題