あなたはパラレルサブプロセスの最大数を制限するために、複数のPythonのプロセスあるいはスレッドを必要としない:
from itertools import izip_longest
from subprocess import Popen, STDOUT
groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT)
for cmd in commands)] * limit # itertools' grouper recipe
for processes in izip_longest(*groups): # run len(processes) == limit at a time
for p in filter(None, processes):
p.wait()
Iterate an iterator by chunks (of n) in Python?を参照してください。
並列サブプロセスの最大数と最小数を両方とも制限したい場合は、スレッドプールを使用できます。
from multiprocessing.pool import ThreadPool
from subprocess import STDOUT, call
def run(cmd):
return cmd, call(cmd, stdout=outputfile, stderr=STDOUT)
for cmd, rc in ThreadPool(limit).imap_unordered(run, commands):
if rc != 0:
print('{cmd} failed with exit status: {rc}'.format(**vars()))
はできるだけ早くlimit
サブプロセスのいずれかが終了すると、新しいサブプロセスは、すべての回でのサブプロセスのlimit
数を維持するために開始されました。
またはThreadPoolExecutor
を使用して:
from concurrent.futures import ThreadPoolExecutor # pip install futures
from subprocess import STDOUT, call
with ThreadPoolExecutor(max_workers=limit) as executor:
for cmd in commands:
executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT)
は簡単なスレッドプールの実装です:
import subprocess
from threading import Thread
try: from queue import Queue
except ImportError:
from Queue import Queue # Python 2.x
def worker(queue):
for cmd in iter(queue.get, None):
subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT)
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)]
for t in threads: # start workers
t.daemon = True
t.start()
for cmd in commands: # feed commands to threads
q.put_nowait(cmd)
for _ in threads: q.put(None) # signal no more commands
for t in threads: t.join() # wait for completion
例外処理を追加し、時期尚早の終了を回避するために。
サブプロセスの出力を文字列で取得する場合は、Python: execute cat subprocess in parallelを参照してください。
これにより、完全に並列処理が無効になります。 – qed
問題は 'multiprocessing'モジュールを使うことであり、各ワーカーは別々のプロセスで生成されるので、あるワーカーでwait()を実行しても他のワーカーが実行されることはありません。つまり、これは正確ではありません。この例では、ワーカーから何も返されないため、結果に '.get()'を呼び出すと何も返されません。 – larsks