2012-08-16 20 views
18

私はprogram.pyの複数のインスタンスを同時に実行したいと同時に、同時に実行されているインスタンスの数を制限しています(例えば、自分のシステムで利用可能なCPUコアの数)。たとえば、コアが10個あり、合計で1000回のprogram.pyを実行する必要がある場合、任意の時点で10個のインスタンスしか作成および実行されません。実行中のプロセス数を制限しながらPythonでのマルチプロセッシング

私はマルチプロセッシングモジュールの使用、マルチスレッド化、キューの使用を試みましたが、簡単に実装できるように思えるものは何もありません。私が持っている最大の問題は、同時に実行されているプロセスの数を制限する方法を見つけることです。一度に1000のプロセスを作成すると、フォークボンベと同等になるため、これは重要です。プログラムからプロセスから返される結果(ディスクに出力)は必要ありません。プロセスはすべて互いに独立して実行されます。

誰でも私に提案するか、これをPythonやBashでどのように実装できるのか教えてください。私は今までにキューを使って書いたコードを投稿しますが、それは意図したとおりには動作せず、間違ったパスになっている可能性があります。

多くのありがとうございます。

むしろPythonのより
+2

[Python process pools](http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool)を試しましたか? – C2H5OH

+0

これを行う最も簡単な方法は、 'multiprocessing.pool'を作成し、ワーカー(program.py)スレッドを生成し、インスタンスをインスタンスとして再割り当てして終了する「コントローラ」プログラムを作成することです。 – jozzas

+0

ありがとう、私はこれを試してみます。何らかの理由で私の最初の試みで、私は結論に達しました。マルチプロセッシングは私が望んでいたものではありませんでしたが、今は正しいようです。だから、この場合、ワーカースレッドはprogram.pyを(スレッドとして?subprocess.Popenで)生成するだけでしょうか?私が従うことができる大まかな例やテンプレート実装を投稿してください。 – steadfast

答えて

2

bashスクリプトが、私は、単純な並列処理のために、多くの場合、それを使用する:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

または非常に単純な場合のために、別のオプションは、Pがprocsの

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3
の数を設定しますxargsのです

プロセス管理者を使用する必要があります。 1つのアプローチは、Circusで提供されているAPIを使用して「プログラムで」行うことですが、ドキュメンテーションサイトは現在オフラインですが、ちょっとした問題だと思いますが、サーカスを使ってこれを処理できます。もう1つの方法は、supervisordを使用し、プロセスのパラメータnumprocsにあなたが持っているコアの数を設定することです。

サーカス使用例:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

を私はあなたがPool.mapアプローチはあなたに多くの意味を成していないと述べた知っています。マップは、仕事のソースを与える簡単な方法であり、各項目に適用する呼び出し可能です。マップのfuncは、指定されたargに対して実際の作業を行うためのエントリポイントです。

それがあなたのために右のように見えるしていない場合、私は生産者 - 消費者パターンを使用する方法についてこちらにかなり詳細な回答を持っています。基本的にhttps://stackoverflow.com/a/11196615/496445

、あなたがキューを作成し、労働者のN番号を開始します。次に、メインスレッドからキューをフィードするか、キューにフィードするProducerプロセスを作成します。ワーカーは待ち行列から作業を続けるだけで、開始したプロセスの数より多くの並行作業が発生することはありません。

また、生産者が消費する速度とリソースにも制約を加える必要がある場合は、すでに優れた作業が多すぎるときにプロデューサをブロックするように、キューに制限を付けることもできます。

呼び出される作業関数は、任意のことを行うことができます。これは、いくつかのシステムコマンドのラッパーでも、Pythonのlibをインポートしてメインルーチンを実行することもできます。限られたリソースの下で任意の実行可能ファイルを実行するようにconfigsを設定できるようにする特定のプロセス管理システムがありますが、これは基本的なPythonのアプローチです。私のことother answerから

スニペット:

基本プール:マルチプロセッシングの使用に関する多くの答えがありますが、プロセスマネージャとプロデューサー

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

非常に良い例ですが、私はCPUSの数をhttp://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-coreで説明しているように改善しました。したがって、num_workersに基づいてnum_workersを設定することができましたマシンのCPU。 –

0

を使用して

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

.pool、hには多くのコードスニペットはありませんこれはメモリ使用が重要な場合に実際にはより有益です。 1000プロセスを開始すると、CPUが過負荷になり、メモリが強制終了します。各プロセスとそのデータパイプラインがメモリを大量に使用する場合、OSやPython自体が並列プロセスの数を制限します。私はバッチでCPUに提出されたジョブの同時数を制限するために以下のコードを開発しました。バッチサイズは、CPUコアの数に比例して調整できます。私のWindowsのPCでは、1バッチあたりのジョブの数は、利用可能なCPUクーを4倍まで効率的にすることができます。

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

唯一の問題は、任意のバッチジョブのいずれかが完了し、吊り状況につながることができなかった場合は、ジョブのバッチの残りが開始されることはありません、です。したがって、処理される関数には適切なエラー処理ルーチンが必要です。

関連する問題