2016-04-01 3 views
2

python2.7では、関数内から初期化されたときにmultiprocessing.Queueが壊れたエラーをスローします。私は問題を再現する最小の例を提供しています。マルチプロセッシングでパイプエラーが発生しました.Queue

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 

def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     q.put(i) 

if __name__ == "__main__": 
    main() 

私はなぜ解読することができません下の壊れたパイプのエラー

Traceback (most recent call last): 
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed 
send(obj) 
IOError: [Errno 32] Broken pipe 

Process finished with exit code 0 

をスローします。確かに、関数内からQueueオブジェクトを取り込むことができないのは間違いです。

答えて

3

ここで起こることは、Queueを作成し、そこに10個のオブジェクトを入れて、Queueを含むすべての内部変数とオブジェクトをガベージコレクションする関数を終了します。 まだ最後の番号をQueueに送信しようとしているため、このエラーが発生します。

「プロセスは、第一フィーダスレッドは パイプにバッファからオブジェクトを転送するが開始されるキューにアイテムを置く場合、」:ドキュメントdocumentationから

put()が別のスレッドで行われると、それは、スクリプトの実行をブロックして、キュー操作を完了する前にmain()機能を終了することができますされていません。

これを試してみてください:オブジェクトがQueueに置かれるまで

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 
import time 
def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    time.sleep(0.1) # Just enough to let the Queue finish 

if __name__ == "__main__": 
    main() 

joinにキューまたはブロック実行方法があるはずです、あなたはドキュメントの外観を取る必要があります。

+0

ワンダフル答えです。私はpython3ではこれが起こらないことを述べる別のものを与えるつもりです。 – hAcKnRoCk

0

@ HarryPotFleurで提案されているようにtime.sleep(0.1)を使用して遅延させると、この問題は解決されます。しかし、Python3でコードをテストしたところ、python3ではパイプの破損の問題はまったく起こりませんでした。私はこれがバグとして報告され、後に修正されたと思います。

+0

これはpython3ではまったく起こらない** **本当です** more 'time.sleep(0.1)'は解決しません!それは理解のためだけだった! –

1

Queue.put()を起動すると、暗黙的なスレッドが開始され、データがキューに配信されます。一方、メインアプリケーションは終了し、データの終了ステーションはありません(キューオブジェクトはガベージコレクションされます)。私はこのしようとするだろう

from multiprocessing import Queue 

def main(): 
    q = Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    q.close() 
    q.join_thread() 

if __name__ == "__main__": 
    main() 

join_thread()性を保証し、バッファ内のすべてのデータがフラッシュされています。 close()は事前に電話する必要がありますjoin_thread()

関連する問題