2012-04-05 6 views
11

私はかなり大きなプロジェクトをPythonで実行しています。これは、コンピューティングを必要とするバックグラウンドタスクの1つを別のコアにオフロードしなければならないため、メインサービスが遅くならないようにしています。私は、multiprocessing.Queueを使用してワーカープロセスからの結果を伝えるときに、明らかに奇妙な挙動に遭遇しました。比較目的のために同じキューをthreading.Threadmultiprocessing.Processに使用すると、スレッドはうまく動作しますが、大きなアイテムをキューに入れた後にプロセスが結合に失敗します。お守り:私はこれはsize = 10000のために正常に動作しますが、size = 100000ためworker_p.join()でハングアップしていることを見てきましたマルチプロセッシングの最大サイズ.Queue item?

import threading 
import multiprocessing 

class WorkerThread(threading.Thread): 
    def __init__(self, queue, size): 
     threading.Thread.__init__(self) 
     self.queue = queue 
     self.size = size 

    def run(self): 
     self.queue.put(range(size)) 


class WorkerProcess(multiprocessing.Process): 
    def __init__(self, queue, size): 
     multiprocessing.Process.__init__(self) 
     self.queue = queue 
     self.size = size 

    def run(self): 
     self.queue.put(range(size)) 


if __name__ == "__main__": 
    size = 100000 
    queue = multiprocessing.Queue() 

    worker_t = WorkerThread(queue, size) 
    worker_p = WorkerProcess(queue, size) 

    worker_t.start() 
    worker_t.join() 
    print 'thread results length:', len(queue.get()) 

    worker_p.start() 
    worker_p.join() 
    print 'process results length:', len(queue.get()) 

。 にはmultiprocessing.Processのインスタンスに入れることができる固有のサイズ制限がありますか?あるいは、ここで明白で根本的な間違いをしていますか?

参考までに、私はUbuntu 10.04でPython 2.6.5を使用しています。

答えて

16

フィーダスレッドがパイプへの書き込みをブロックするように(実際にはパイプを並行アクセスから保護するロックを取得しようとしているとき)、下にあるパイプがいっぱいに見えます。

チェックこの問題http://bugs.python.org/issue8237

+2

ありがとうございます。これは私が経験している問題です。参加する前に親スレッドでデキューすると正常に動作しているようです。 –

+1

ありがとうございました。 2行をスワップするだけです。 "worker_t.join() print 'スレッドの結果の長さ:'、len(queue.get())" – Catbuilts

2

デフォルトでは、キューの最大サイズは無限ですが、あなたはそれを超えています。あなたのケースでは、キューの最大サイズは無制限ですが、 worker_pはアイテムをキューに入れていますが、キューは解放されていなければなりませんe参加を呼び出す。詳細は下のリンクを参照してください。 https://docs.python.org/2/library/multiprocessing.html#programming-guidelines