11
私は多くのワーカーが特定のデータを処理して結果を親プロセスに返すアルゴリズムに、Pythonマルチプロセッシングライブラリを使用します。私はマルチプロセッシングを使用します。ジョブを作業者に渡すためのコストと、結果を収集するための第2のコストです。作業者のPythonマルチプロセッシングと例外の処理
作業者がデータの一部を処理できなくなるまで、すべて正常に機能します。各ワーカー以下の簡略化した例では、2つのフェーズがあります
- 初期化 - この場合、作業者はこれをスキップする必要があり、失敗する可能性があるデータのチャンクを処理する - この場合、作業者は
- データ処理を破壊しなければならない、失敗する可能性がチャンクを作成し、次のデータを続行します。
これらのフェーズのいずれかが失敗すると、スクリプトの完了後にデッドロックが発生します。 init()
が失敗した
- 、どのように作業員が無効で検出し、それが完了するまで待機しない:私はこのコードを約2つの質問があり
import multiprocessing as mp import random workers_count = 5 # Probability of failure, change to simulate failures fail_init_p = 0.2 fail_job_p = 0.3 #========= Worker ========= def do_work(job_state, arg): if random.random() < fail_job_p: raise Exception("Job failed") return "job %d processed %d" % (job_state, arg) def init(args): if random.random() < fail_init_p: raise Exception("Worker init failed") return args def worker_function(args, jobs_queue, result_queue): # INIT # What to do when init() fails? try: state = init(args) except: print "!Worker %d init fail" % args return # DO WORK # Process data in the jobs queue for job in iter(jobs_queue.get, None): try: # Can throw an exception! result = do_work(state, job) result_queue.put(result) except: print "!Job %d failed, skip..." % job finally: jobs_queue.task_done() # Telling that we are done with processing stop token jobs_queue.task_done() #========= Parent ========= jobs = mp.JoinableQueue() results = mp.Queue() for i in range(workers_count): mp.Process(target=worker_function, args=(i, jobs, results)).start() # Populate jobs queue results_to_expect = 0 for j in range(30): jobs.put(j) results_to_expect += 1 # Collecting the results # What if some workers failed to process the job and we have # less results than expected for r in range(results_to_expect): result = results.get() print result #Signal all workers to finish for i in range(workers_count): jobs.put(None) #Wait for them to finish jobs.join()
:このコードは、私の問題をシミュレート?
do_work()
が失敗した場合、親プロセスに通知して結果の待ち行列に結果が少なくなるようにする方法を教えてください。
ありがとうございました!
または、エラーのインバンド通信を避けるために、結果キューに '(結果、エラー)'(エラーは成功しない場合)というタプルを入れることができます。 – jfs