2009-10-08 6 views
15

multiprocessing.Queueをリストにダンプしたい。その作業のために、私は次の関数を書いた:マルチプロセッシングをダンプする.Queueをリストに入れる

import Queue 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    # START DEBUG CODE 
    initial_size = queue.qsize() 
    print("Queue has %s items initially." % initial_size) 
    # END DEBUG CODE 

    while True: 
     try: 
      thing = queue.get(block=False) 
      result.append(thing) 
     except Queue.Empty: 

      # START DEBUG CODE 
      current_size = queue.qsize() 
      total_size = current_size + len(result) 
      print("Dumping complete:") 
      if current_size == initial_size: 
       print("No items were added to the queue.") 
      else: 
       print("%s items were added to the queue." % \ 
         (total_size - initial_size)) 
      print("Extracted %s items from the queue, queue has %s items \ 
      left" % (len(result), current_size)) 
      # END DEBUG CODE 

      return result 

しかし、何らかの理由でうまくいきません。

は、次のシェルセッションを守ってください。

>>> import multiprocessing 
>>> q = multiprocessing.Queue() 
>>> for i in range(100): 
...  q.put([range(200) for j in range(100)]) 
... 
>>> q.qsize() 
100 
>>> l=dump_queue(q) 
Queue has 100 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 99 items left 
>>> l=dump_queue(q) 
Queue has 99 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 3 items from the queue, queue has 96 items left 
>>> l=dump_queue(q) 
Queue has 96 items initially. 
Dumping complete: 
0 items were added to the queue. 
Extracted 1 items from the queue, queue has 95 items left 
>>> 

ここで何が起こっていますか?なぜ、すべての商品が投棄されていないのですか?

答えて

20

は、これを試してください

import Queue 
import time 

def dump_queue(queue): 
    """ 
    Empties all pending items in a queue and returns them in a list. 
    """ 
    result = [] 

    for i in iter(queue.get, 'STOP'): 
     result.append(i) 
    time.sleep(.1) 
    return result 

import multiprocessing 
q = multiprocessing.Queue() 
for i in range(100): 
    q.put([range(200) for j in range(100)]) 
q.put('STOP') 
l=dump_queue(q) 
print len(l) 

マルチプロセッシングキューバッファをオフに働く引っ張り、管にそれをフラッシュフィーダスレッドを有する内部バッファを有します。すべてのオブジェクトがフラッシュされていない場合、Emptyが時期尚早に生成されるケースがあります。待ち行列の終わりを示すためにセンチネルを使用することは安全で(信頼できる)。また、iter(get、sentinel)イディオムを使用するだけでEmptyに頼るよりも優れています。

私はフラッシングタイミングのために空を発生させるのが好きではありません(私は、フィーダスレッドへのコンテキスト切り替えを許可するためにtime.sleep(.1)を追加しました。必要ないかもしれません。それはGILをリリースする習慣です)。

+3

一般的な考え方Jesseは、より安全で信頼性の高いものでも、一般的な文字列ではなく、マルティンを 'uuid'文字列として使うことができます(マルチプロセッシングではなく、特定の' sentinel = object())。他のスレッドが同時に動いている場合でも、問題が発生する可能性があります。唯一の本当に安全な方法は、キューの内部に頼っている方法です、悲しいかな - ) –

+0

あなたはそうです。私は、文字列のセンチネルを使用して 'クイック'ソリューションに行ってきましたが、これはこの特定のケースでのみ機能します。私はmp.queueが待ち行列に組み込まれたいくつかのセンチネルサポートを必要としているのだろうかと疑問を持ち始めています – jnoller

+0

この答えをありがとう。私は今日このような問題を抱えていて、この反応が私を解決するのに役立った。完全な問題書き込み:http://www.bryceboe.com/2011/01/28/the-python-multiprocessing-queue-and-large-objects/ – bboe

3

状況によっては、すでにすべてを計算しているため、キューを変換するだけです。

shared_queue = Queue() 
shared_queue_list = [] 
... 
join() #All process are joined 
while shared_queue.qsize() != 0: 
    shared_queue_list.append(shared_queue.get()) 

ここでshared_queue_listは結果をリストに変換します。

関連する問題