2017-03-16 12 views
1

Pythonから約8000個のファイルを外部コマンドで起動したい。すべてのファイルは他のファイルとは独立して処理されます。唯一の制約は、すべてのファイルが処理された後に実行を続けることです。私は2つの論理コア(multiprocessing.cpu_count()が8を返します)を持つ4つの物理コアを持っています。私の考えは、8つのコアのうちの4つで実行される4つの並列独立プロセスのプールを使用することでした。その間、私のマシンは使えるはずです。何千ものファイルを外部コマンドで多重処理する

import multiprocessing 
import subprocess 
import os 
from multiprocessing.pool import ThreadPool 


def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.Popen, (cmd,)) 
    pool.close() 
    pool.join() 


def main(): 
    process_files('dir1', 'dir2', 'mol:H') 
    do_some_stuff('dir2') 
    process_files('dir2', 'dir3', 'mol:a') 
    do_more_stuff('dir3') 

シーケンシャル治療は、100個のファイルのバッチのために120秒をとります。ここでは

は、私が何をしてきたのです。上記で概説したマルチプロセッシングバージョン(機能process_files)は、バッチに対して20秒しかかかりません。しかし、私がprocess_filesを8000個のファイル全体で実行すると、PCがハングし、1時間後にフリーズしません。

私の質問は以下のとおりです。

1)私は)ThreadPoolが正確には、ここでmultiprocessing.cpu_count()/2プロセスのプロセスのプールを(初期化することになっていると思いました。しかし私のコンピュータは8000のファイルにぶら下がっているが、100ではなく、プールのサイズが考慮されていないことを示唆している。それか、私は何か間違ったことをしています。あなたは説明できますか?

2)Pythonで独立したプロセスを起動するには、それぞれが外部コマンドを起動する必要があり、すべてのリソースが処理で取り込まれないようにするのが正しい方法ですか?

+0

@larsks( 'ThreadPool'は' apply_async'とサブプロセス 'call's)で提案されたソリューションと@Roland Smith(' Popen'オブジェクトを使った手動プール管理)を比較しました。私のベンチマークでは、 'ThreadPool'の方が実際の方が高速です。両方とも非常にありがとう! – user3638629

答えて

1

あなたの基本的な問題は、subprocess.Popenの使用だと思います。その方法はではありません。はコマンドが完了するのを待ってから復帰します。関数はすぐに(コマンドがまだ実行中であっても)返るので、ThreadPoolに関する限り関数は終了し、別のスレッドを生成することができます。つまり、8000程度のプロセスが生成されます。

おそらくsubprocess.check_call使用して、より良い運を持っているでしょう:だから

Run command with arguments. Wait for command to complete. If 
the exit code was zero then return, otherwise raise 
CalledProcessError. The CalledProcessError object will have the 
return code in the returncode attribute. 

:あなたが本当に終了コードを気にしない場合は

def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.check_call, (cmd,)) 
    pool.close() 
    pool.join() 

は、その後、あなたがsubprocess.callをすることができ、これはしませんプロセスからのゼロ以外の終了コードが発生した場合に例外を発生させます。

+0

この非常に明快でストレートな事実の説明をありがとう。実際、 'サブプロセス.Popen'は非常に多くのプロセスの産卵を引き起こすものでなければなりません。私は 'subprocess.call'を使用していませんでした。これは、Pythonがプールを有用な労働者で満たす代わりにプロセスが完了するのを待つことを考えていました。しかし、それが最初にプールがある理由です。 (申し訳ありません、担当者が低すぎる、アップアップすることはできません) – user3638629

+0

この回答の左側にあるチェックマークをクリックすることで、これを「受け入れられた」回答とすることができます。 – larsks

+0

はい、私は知っています。問題は、2つの非常に有益な回答(私は現在、提案されている2つのソリューションに基づいて結果をテストしています)を決定するのに苦労しています。 :D – user3638629

1

Python 3を使用している場合は、mapメソッドをconcurrent.futures.ThreadPoolExecutorとすることを検討します。

また、サブプロセスのリストを自分で管理することもできます。

次の例では、ビデオファイルをTheora/Vorbis形式に変換するためにffmpegを開始する関数を定義しています。開始された各サブプロセスのPopenオブジェクトを返します。メインプログラムで

def startencoder(iname, oname, offs=None): 
    args = ['ffmpeg'] 
    if offs is not None and offs > 0: 
     args += ['-ss', str(offs)] 
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a', 
      'libvorbis', '-q:a', '3', '-sn', oname] 
    with open(os.devnull, 'w') as bb: 
     p = subprocess.Popen(args, stdout=bb, stderr=bb) 
    return p 

、実行サブプロセスを表すPopenオブジェクトのリストは、このように維持されます。

outbase = tempname() 
ogvlist = [] 
procs = [] 
maxprocs = cpu_count() 
for n, ifile in enumerate(argv): 
    # Wait while the list of processes is full. 
    while len(procs) == maxprocs: 
     manageprocs(procs) 
    # Add a new process 
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1) 
    procs.append(startencoder(ifile, ogvname, offset)) 
    ogvlist.append(ogvname) 
# All jobs have been submitted, wail for them to finish. 
while len(procs) > 0: 
    manageprocs(procs) 

したがって、新しいプロセスは、コアよりも実行中のサブプロセスが少ない場合にのみ開始されます。複数回使用されるコードは、manageprocs機能に分かれています。

def manageprocs(proclist): 
    for pr in proclist: 
     if pr.poll() is not None: 
      proclist.remove(pr) 
    sleep(0.5) 

sleepへの呼び出しは、プログラムがループ内で回転しないようにするために使用されます。

+0

'concurrent.futures.ThreadPoolExecutor'について言及してくれてありがとう(ここでもPython 2.7を使用しています)。この素晴らしいマニュアルプール管理の例をありがとう。私は似たようなことをしようとしていた(それを繰り返す間にリスト上で 'remove 'を行うべきではないと思ったが)何かが間違っていたはずである。私はすぐにこのソリューションをテストします。 (申し訳ありませんが、あまりにも低い、アップアップすることはできません。) – user3638629

+0

2つの方法(あなたの答えと@larsks ')を比較しました。私はこの解決策を非常に気に入っていますが、プールを手動で管理するとオーバーヘッドが発生すると思われます。これはおそらく 'sleep(スリープ) 'を呼び出したためです(プロセスマネージャを0.2秒以上スリープさせました)。実際の入力の1/10のバッチテストでは、手動プール管理は 'cpu_count() - 1'コアの' ThreadPool'より8%遅く、 'cpu_count()/ 2'の' ThreadPool'よりも27%遅いです。コア。 – user3638629

+0

違いがどこから来るのかを知るには、実際のプロファイリングを行う必要があります。物に影響を与える多くの要因があります。例えば、 'cpu_count()'がサブプロセスの最適量*であるとは限りません。おそらく 'cpu_count()/ 2'から' cpu_count()* 2'までの範囲で試してみるべきでしょう。さらに、通常は 'molconvert 'にかかる時間に応じて' sleep'の量を調整するべきです。しかし、私はPython 3に完全に切り替えたので、最近のようなものには 'concurrent.futures.ThreadPoolExecutor'を使う傾向があります。 –

関連する問題