私はワーカーを作成し、いくつかのタスクを与え、結果を処理するメインプロセスに結果を戻し、彼らはより多くの仕事をする。私は労働者を終了する前にこれを一定量繰り返す必要があります。マルチプロセッシング - ジョブをワーカーに送信し、結果を受信し、ワーカーを更新し、再計算する
私のコードは正常に動作しているようですが、すべてのワーカーが各繰り返しごとに更新されているわけではありません。例えば。出力は、私がいた受信:
Creating 4 workers
Finished workin 4
Finished workin 6
Finished workin 3
Finished workin 4
Pipe Documentationrecv()
による受信する何かがあるまでブロックしなければなりません。なぜ私の例ではこれは当てはまりませんか?
from multiprocessing import Process, Queue, JoinableQueue, Pipe, Event, Barrier, Value
import time
def worker(qu_task,qu_results,chi,ev,bar,val):
ext=False
dum=0
while True:
ev.wait()
while qu_task.empty() is False:
a=qu_task.get() # Pop work from queue
if a is None: #Poison pill
bar.wait() # Wait for all processes so that they all receive "None"
ext=True
qu_task.task_done() #
print("Finished workin", dum)
break
# Do stuff
time.sleep(.1)
qu_results.put(111)
qu_task.task_done()
# time.sleep(1.0)
if ext: # Break out if finished
break
data1, data2 = chi.recv() # Blocks until there its something to receive
dum+=data2
# Do stuff
def main(NProc, NScen, NTime, branch):
qu_tasks=JoinableQueue() # To use task_done() and join()
qu_results=Queue()
workers=[]
val=Value('i',0)
conn=[]
bar=Barrier(NProc)
ev=Event()
print("Creating ", NProc, " workers")
for i in range(NProc):
par,chi=Pipe()
workers.append(Process(target=worker,args=(qu_tasks,qu_results,chi,ev,bar,val)))
conn.append(par)
for w in workers:
w.start()
for t in reversed(range(NTime)):
for s in range(NScen):
a=0.0
for b in branch:
k=(b,s,t)
qu_tasks.put(k) # Send work
ev.set() # Start workers
qu_tasks.join() # Synchronize workers
ev.clear() # Reset lock
for idx,i in enumerate(branch):
a+=qu_results.get()
#print(a)
kk=[1,2,3,5]
for idx,c in enumerate(conn):
c.send(([i*(1+idx) for i in kk],1)) # Send more data to workers
for i in range(NProc): # Kill workers
qu_tasks.put(None)
ev.set()
qu_tasks.join()
for w in workers: # Wait for them to finish
w.join()
if __name__=="__main__":
branch=[i for i in range(1,7)]
NTime=2
NScen=3
main(4,NScen, NTime, branch)
私はまだ私が望む動作を与える解決策を見つけることができません。万が一コード例を投稿できますか? – martihj