0

私の考えでは、exeファイルの並列実行にPythonのマルチプロセッシングを使用することに根本的な問題があるかどうかを確認できますか?並列処理のためのマルチプロセッシングプロセス()の正しい使い方

私は膨大な数のジョブ(サンプルコードでは100000)を使用しており、使用可能なすべてのコア(自分のコンピュータでは16個)を並列に使用したいと考えています。以下のコードは、私が見ている多くの例のようにキューを使用していませんが、動作するようです。ちょうどコードが「うまくいく」状況を避けたいが、いくつかのコンピューティング・ノードで実行するためにこれをスケールアップすると、巨大な間違いが起こって爆発するのを待っている。誰も助けることができますか?

import subprocess 
import multiprocessing 

def task_fn(task_dir) : 
    cmd_str = ["my_exe","-my_exe_arguments"] 
    try : 
     msg = subprocess.check_output(cmd_str,cwd=task_dir,stderr=subprocess.STDOUT,universal_newlines=True) 
    except subprocess.CalledProcessError as e : 
     with open("a_unique_err_log_file.log","w") as f : 
      f.write(e.output) 
    return; 

if __name__ == "__main__": 

    n_cpu = multiprocessing.cpu_count() 
    num_jobs = 100000 
    proc_list = [multiprocessing.Process() for p in range(n_cpu)] 

    for i in range(num_jobs): 
     task_dir = str(i) 
     task_processed = False 
     while not(task_processed) : 
      # Search through all processes in p_list repeatedly until a 
      # terminated processs is found to take on a new task 
      for p in range(len(p_list)) : 
       if not(p_list[p].is_alive()) : 
        p_list[p] = multiprocessing.Process(target=task_fn,args=(task_dir,)) 
        p_list[p].start() 
        task_processed = True 

    # At the end of the outermost for loop 
    # Wait until all the processes have finished 
    for p in p_list : 
     p.join() 

    print("All Done!") 

答えて

1

自分でプロセスを作成して管理する代わりに、Pool of workersを使用してください。それはあなたのためにすべてを扱うように設計されています。

従業員がサブプロセスを作成しているので、プロセスではなくスレッドを使用できます。

また、作業者は同じファイルに書き込むようです。同時インスタンスからのアクセスを保護する必要があります。そうしないと、結果は完全に順不同です。

from threading import Lock 
from concurrent.futures import ThreadPoolExecutor 


mutex = Lock() 
task_dir = "/tmp/tasks" 


def task_fn(task_nr): 
    """This function will run in a separate thread.""" 
    cmd_str = ["my_exe","-my_exe_arguments"] 
    try: 
     msg = subprocess.check_output(cmd_str, cwd=task_dir, stderr=subprocess.STDOUT, universal_newlines=True) 
    except subprocess.CalledProcessError as e: 
     with mutex: 
      with open("a_unique_PROTECTED_err_log_file.log", "w") as f : 
       f.write(e.output) 

    return task_nr 


with ThreadPoolExecutor() as pool: 
    iterator = pool.map(task_fn, range(100000)) 
    for result in iterator: 
     print("Task %d done" % result) 
+0

お返事ありがとうございます。試してみる。しかし、基本的に私がそれをコーディングする方法に間違ったことはありませんか? – bFig8

+0

はいあります。そのままでは、エラーが発生した場合、保護されていないファイルへのアクセスが原因で、ログファイルがすべてスクランブルされます。さらに、プロセスの使用は過度なものです。スレッドだけを使用してください。 – noxdafox

+0

okおそらく誤解があります。「a_unique_err_log_file.log」は、すべてのログファイルが一意であると言う怠け者でした。文字通りファイル名に "a_unique_err_log_file.log"という文字列があるので、すべてのプロセスが実際に一意のファイルに書き込まれるようになりました。 – bFig8

関連する問題