2012-03-21 17 views
10

subprocessを使用することがわかりました。pythonで外部コマンドを呼び出すために使用するサブプロセスの数を制御します

しかし、私はparallでいくつかのコマンドを実行したいが、生成されるプロセスの数を制限したいのですが?私が気にするのは、私がサブプロセスをブロックすることができないということです。例えば、私は

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile) 

を呼び出す場合、プロセスは、仕上げにcmdを待たずに、継続します。したがって、私はそれをmultiprocessingライブラリの作業員にまとめられません。例えば

私がしなければ、:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile); 

pool = Pool(processes = 10); 
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list]; 
ans = [res.get() for res in results]; 

その後、各ワーカーは、サブプロセスを産卵した後に終了し、戻ります。だから私は実際にsubprocessによって生成されるプロセスの数をPoolを使って制限することはできません。

サブプロセスの数を制限する適切な方法は何ですか?

答えて

6

コマンドが完了するのを待つ場合は、subprocess.callを使用できます。詳細については、pydoc subprocessを参照してください。

また、あなたの労働者にPopen.waitメソッドを呼び出すことができます。

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile); 
    p.wait() 
+0

これにより、完全に並列処理が無効になります。 – qed

+2

問題は 'multiprocessing'モジュールを使うことであり、各ワーカーは別々のプロセスで生成されるので、あるワーカーでwait()を実行しても他のワーカーが実行されることはありません。つまり、これは正確ではありません。この例では、ワーカーから何も返されないため、結果に '.get()'を呼び出すと何も返されません。 – larsks

7

あなたはパラレルサブプロセスの最大数を制限するために、複数のPythonのプロセスあるいはスレッドを必要としない:

from itertools import izip_longest 
from subprocess import Popen, STDOUT 

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT) 
      for cmd in commands)] * limit # itertools' grouper recipe 
for processes in izip_longest(*groups): # run len(processes) == limit at a time 
    for p in filter(None, processes): 
     p.wait() 

Iterate an iterator by chunks (of n) in Python?を参照してください。

並列サブプロセスの最大数と最小数を両方とも制限したい場合は、スレッドプールを使用できます。

from multiprocessing.pool import ThreadPool 
from subprocess import STDOUT, call 

def run(cmd): 
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT) 

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands): 
    if rc != 0: 
     print('{cmd} failed with exit status: {rc}'.format(**vars())) 

はできるだけ早くlimitサブプロセスのいずれかが終了すると、新しいサブプロセスは、すべての回でのサブプロセスのlimit数を維持するために開始されました。

またはThreadPoolExecutorを使用して:

from concurrent.futures import ThreadPoolExecutor # pip install futures 
from subprocess import STDOUT, call 

with ThreadPoolExecutor(max_workers=limit) as executor: 
    for cmd in commands: 
     executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT) 

は簡単なスレッドプールの実装です:

import subprocess 
from threading import Thread 

try: from queue import Queue 
except ImportError: 
    from Queue import Queue # Python 2.x 


def worker(queue): 
    for cmd in iter(queue.get, None): 
     subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT) 

q = Queue() 
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)] 
for t in threads: # start workers 
    t.daemon = True 
    t.start() 

for cmd in commands: # feed commands to threads 
    q.put_nowait(cmd) 

for _ in threads: q.put(None) # signal no more commands 
for t in threads: t.join() # wait for completion 

例外処理を追加し、時期尚早の終了を回避するために。

サブプロセスの出力を文字列で取得する場合は、Python: execute cat subprocess in parallelを参照してください。

関連する問題