2017-05-09 9 views
1

私はワーカーを作成し、いくつかのタスクを与え、結果を処理するメインプロセスに結果を戻し、彼らはより多くの仕事をする。私は労働者を終了する前にこれを一定量繰り返す必要があります。マルチプロセッシング - ジョブをワーカーに送信し、結果を受信し、ワーカーを更新し、再計算する

私のコードは正常に動作しているようですが、すべてのワーカーが各繰り返しごとに更新されているわけではありません。例えば。出力は、私がいた受信:

Creating 4 workers 
Finished workin 4 
Finished workin 6 
Finished workin 3 
Finished workin 4 

Pipe Documentationrecv()による受信する何かがあるまでブロックしなければなりません。なぜ私の例ではこれは当てはまりませんか?

from multiprocessing import Process, Queue, JoinableQueue, Pipe, Event, Barrier, Value 
import time 

def worker(qu_task,qu_results,chi,ev,bar,val): 
    ext=False 
    dum=0 
    while True: 
     ev.wait() 
     while qu_task.empty() is False: 
      a=qu_task.get() # Pop work from queue 
      if a is None: #Poison pill 
       bar.wait() # Wait for all processes so that they all receive "None" 
       ext=True 
       qu_task.task_done() # 
       print("Finished workin", dum) 
       break 

      # Do stuff 
      time.sleep(.1) 
      qu_results.put(111) 
      qu_task.task_done() 

#  time.sleep(1.0) 

     if ext: # Break out if finished 
      break 

     data1, data2 = chi.recv() # Blocks until there its something to receive 
     dum+=data2 
     # Do stuff 

def main(NProc, NScen, NTime, branch): 
    qu_tasks=JoinableQueue() # To use task_done() and join() 
    qu_results=Queue() 
    workers=[] 
    val=Value('i',0) 
    conn=[] 
    bar=Barrier(NProc) 
    ev=Event() 
    print("Creating ", NProc, " workers") 
    for i in range(NProc): 
     par,chi=Pipe() 
     workers.append(Process(target=worker,args=(qu_tasks,qu_results,chi,ev,bar,val))) 
     conn.append(par) 

    for w in workers: 
     w.start() 

    for t in reversed(range(NTime)): 
     for s in range(NScen): 
      a=0.0 
      for b in branch: 
       k=(b,s,t) 
       qu_tasks.put(k) # Send work 

      ev.set() # Start workers 
      qu_tasks.join() # Synchronize workers 
      ev.clear() # Reset lock 
      for idx,i in enumerate(branch): 
       a+=qu_results.get() 
      #print(a) 

      kk=[1,2,3,5] 
      for idx,c in enumerate(conn): 
       c.send(([i*(1+idx) for i in kk],1)) # Send more data to workers 

    for i in range(NProc): # Kill workers 
     qu_tasks.put(None) 
    ev.set() 
    qu_tasks.join() 

    for w in workers: # Wait for them to finish 
     w.join() 

if __name__=="__main__": 
    branch=[i for i in range(1,7)] 
    NTime=2 
    NScen=3 

    main(4,NScen, NTime, branch) 

答えて

0

Pipe.recv()お待ちください、心配しないでください。 Noneのためループが早く終了するので、6回呼び出されることはありません。

、非同期プログラミングでもあることを非同期を覚えて、そしてあなたの周りのことtime.sleep(.1)を移動した場合、作業者がPipe.recv()

に到達することなく、「次」のバッチからのメッセージを消費して開始したときに、あなたのコミュニケーション+処理パターンのように予定がありますあなたは "仕上げ作業0"にも到達することができます。

また、(Joinable)Queue.put()ループの速度を遅くすると、すべてのタスクをディスパッチするよりも早く中間ループが空になるため、ワーカーはPipe.recv()に到達しますが、メインプログラムは待機するJoinableQueue.join()

提案:パイプまたは(結合可能な)キューを選択しますが、両方を選択しないでください。それらを連動させることは単なる難しく、何の利益ももたらさない。

+0

私はまだ私が望む動作を与える解決策を見つけることができません。万が一コード例を投稿できますか? – martihj

関連する問題