2017-03-08 11 views
0

延長せ、キューと労働者のプールを使用して:マルチプロセッシングは、労働者は、私はPythonでマルチプロセッシングを理解しようとしていますが、現在、以下に苦しんでいたキュー

私はAからオブジェクトを供給したい労働者のプールで出始めにジェネレータ機能を待ち行列に入れ、それは次にワーカーによって消費される。これはうまくいきますが、私は現在、プログラムを拡張して、ワーカーが作業をキューに追加できるようにしたいと考えています。しかし、これは私が最初のループで追加した作業の直後に2番目のループに追加されたストップコードがあるため(例のコードを参照)、問題にぶつかる部分です。つまり、ワーカーが追加した追加作業は決して実行されません。

キューが空で、ワーカーが何もしていないかどうかを確認する方法は、労働者を止める最後のforループに進む前に。しかし、私は労働者の状態をチェックする方法を知らない。私は私が欲しかったものを実装するために管理カウンティングセマフォ値を使用して

import multiprocessing, time, random 

def f(queue): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue,)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

# All extra work is skipped 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.get() # Will yield 'Extra Work!' should be empty 

答えて

0

最小限のコードでは、一例を示します。この値を増減して各作業者の活動を追跡し、キューが空の場合はすぐにワーカーが何も処理しなくなります。

アドバイスありがとうございます。

例コード:

import multiprocessing, time, random 

def f(queue, semaphore): 
    worker_name = multiprocessing.current_process().name 
    print "Started: {}".format(worker_name) 

    while True: 
     value = queue.get() 
     if value is None: 
      break 

     with semaphore.get_lock(): 
      semaphore.value -= 1 

     print "{} is processing '{}'".format(worker_name, value) 
     # compute(value) 
     time.sleep(1) 

     # Worker may add additional work to queue 
     if random.random() > 0.7: 
      queue.put("Extra work!") 

     with semaphore.get_lock(): 
      semaphore.value += 1 

    print "Stopping: {}".format(worker_name) 


n_workers = 4 
semaphore = multiprocessing.Value('i', n_workers) 
queue = multiprocessing.Queue() 
pool = multiprocessing.Pool(n_workers, f, (queue, semaphore)) 

# Feed large objects from generator 
for i in xrange(20): 
    queue.put(i) 

while not queue.empty() or semaphore.value != n_workers: 
    time.sleep(0.2) 

# Terminate workers after finishing work 
for __ in xrange(n_workers): 
    queue.put(None) 

pool.close() 
pool.join() 

print "Finished!" 
print queue.empty() # True 
関連する問題