11

私は多くのワーカーが特定のデータを処理して結果を親プロセスに返すアルゴリズムに、Pythonマルチプロセッシングライブラリを使用します。私はマルチプロセッシングを使用します。ジョブを作業者に渡すためのコストと、結果を収集するための第2のコストです。作業者のPythonマルチプロセッシングと例外の処理

作業者がデータの一部を処理できなくなるまで、すべて正常に機能します。各ワーカー以下の簡略化した例では、2つのフェーズがあります

  • 初期化 - この場合、作業者はこれをスキップする必要があり、失敗する可能性があるデータのチャンクを処理する - この場合、作業者は
  • データ処理を破壊しなければならない、失敗する可能性がチャンクを作成し、次のデータを続行します。

これらのフェーズのいずれかが失敗すると、スクリプトの完了後にデッドロックが発生します。 init()が失敗した

  1. 、どのように作業員が無効で検出し、それが完了するまで待機しない:私はこのコードを約2つの質問があり

    import multiprocessing as mp 
    import random 
    
    workers_count = 5 
    # Probability of failure, change to simulate failures 
    fail_init_p = 0.2 
    fail_job_p = 0.3 
    
    
    #========= Worker ========= 
    def do_work(job_state, arg): 
        if random.random() < fail_job_p: 
         raise Exception("Job failed") 
        return "job %d processed %d" % (job_state, arg) 
    
    def init(args): 
        if random.random() < fail_init_p: 
         raise Exception("Worker init failed") 
        return args 
    
    def worker_function(args, jobs_queue, result_queue): 
        # INIT 
        # What to do when init() fails? 
        try: 
         state = init(args) 
        except: 
         print "!Worker %d init fail" % args 
         return 
        # DO WORK 
        # Process data in the jobs queue 
        for job in iter(jobs_queue.get, None): 
         try: 
          # Can throw an exception! 
          result = do_work(state, job) 
          result_queue.put(result) 
         except: 
          print "!Job %d failed, skip..." % job 
         finally: 
          jobs_queue.task_done() 
        # Telling that we are done with processing stop token 
        jobs_queue.task_done() 
    
    
    
    #========= Parent ========= 
    jobs = mp.JoinableQueue() 
    results = mp.Queue() 
    for i in range(workers_count): 
        mp.Process(target=worker_function, args=(i, jobs, results)).start() 
    
    # Populate jobs queue 
    results_to_expect = 0 
    for j in range(30): 
        jobs.put(j) 
        results_to_expect += 1 
    
    # Collecting the results 
    # What if some workers failed to process the job and we have 
    # less results than expected 
    for r in range(results_to_expect): 
        result = results.get() 
        print result 
    
    #Signal all workers to finish 
    for i in range(workers_count): 
        jobs.put(None) 
    
    #Wait for them to finish 
    jobs.join() 
    

    :このコードは、私の問題をシミュレート?

  2. do_work()が失敗した場合、親プロセスに通知して結果の待ち行列に結果が少なくなるようにする方法を教えてください。

ありがとうございました!

答えて

10

コードを少し変更して機能させました(下記の説明を参照)。

import multiprocessing as mp 
import random 

workers_count = 5 
# Probability of failure, change to simulate failures 
fail_init_p = 0.5 
fail_job_p = 0.4 


#========= Worker ========= 
def do_work(job_state, arg): 
    if random.random() < fail_job_p: 
     raise Exception("Job failed") 
    return "job %d processed %d" % (job_state, arg) 

def init(args): 
    if random.random() < fail_init_p: 
     raise Exception("Worker init failed") 
    return args 

def worker_function(args, jobs_queue, result_queue): 
    # INIT 
    # What to do when init() fails? 
    try: 
     state = init(args) 
    except: 
     print "!Worker %d init fail" % args 
     result_queue.put('init failed') 
     return 
    # DO WORK 
    # Process data in the jobs queue 
    for job in iter(jobs_queue.get, None): 
     try: 
      # Can throw an exception! 
      result = do_work(state, job) 
      result_queue.put(result) 
     except: 
      print "!Job %d failed, skip..." % job 
      result_queue.put('job failed') 


#========= Parent ========= 
jobs = mp.Queue() 
results = mp.Queue() 
for i in range(workers_count): 
    mp.Process(target=worker_function, args=(i, jobs, results)).start() 

# Populate jobs queue 
results_to_expect = 0 
for j in range(30): 
    jobs.put(j) 
    results_to_expect += 1 

init_failures = 0 
job_failures = 0 
successes = 0 
while job_failures + successes < 30 and init_failures < workers_count: 
    result = results.get() 
    init_failures += int(result == 'init failed') 
    job_failures += int(result == 'job failed') 
    successes += int(result != 'init failed' and result != 'job failed') 
    #print init_failures, job_failures, successes 

for ii in range(workers_count): 
    jobs.put(None) 

私の変更:

  1. は、(代わりにJoinableQueueの)Queueちょうど正常であることがjobsを変更しました。
  2. 労働者は特別な結果の文字列 "init failed"と "job failed"を返します。
  3. マスタプロセスは、特定の条件が有効である限り、上記の特別な結果を監視します。
  4. 最後に、あなたが持っている多くの労働者の「停止」リクエスト(つまり、Noneジョブ)を入力します。これらのすべてがキューから引き出されるわけではないことに注意してください(作業者が初期化に失敗した場合)。

ところで、元のコードは使いやすくて使いやすいものでした。ランダム確率ビットはかなりクールです。

+2

または、エラーのインバンド通信を避けるために、結果キューに '(結果、エラー)'(エラーは成功しない場合)というタプルを入れることができます。 – jfs

関連する問題