2017-03-10 8 views
1

私はPythonで苦労している単純な問題の例があります。私は、関数 "Thread_Test()"が0〜1の間隔で乱数の一様な配列を生成し、配列中の "Sample_Size"個のデータ点を持つ例を実行するためにマルチプロセスを使用しています。このサンプルを入手すると、コード実行を高速化するためにプロセスの複数のコピーを生成する予定です。次に、Thread_Test()にもっと複雑な計算セットを追加します。このサンプルは、Sample_Sizeを9,000以下に保つ限り、正常に動作します。 Sample_Sizeを10から8,000に増やすと実行時間が増えますが、8,000で実行するには0.01秒しかかかりません。しかし、Sample_Sizeを9,000に増やすとすぐに、コードは永久に実行され、計算が完了することはありません。これを引き起こしているのは何ですか? (あなたが見るようにプロデューサー)あなたは、あなたが同時に要素を取得し、メインプロセス(消費者)を約束しなければならないサブプロセスでキューにかなっを置けばPythonでマルチプロセスを正しく使用する方法

from multiprocessing import Process, Queue 
import queue 
import random 
import timeit 
import numpy as np 

def Thread_Test(Sample_Size): 
    q.put(np.random.uniform(0,1,Sample_Size)) 
    return 

if __name__ == "__main__": 
    Sample_Size = 9000 
    q = Queue() 
    start = timeit.default_timer() 
    p = Process(target=Thread_Test,args=(Sample_Size,)) 
    p.start() 
    p.join() 

    result = np.array([]) 
    while True: 
     if not q.empty(): 
     result = np.append(result,q.get()) 
     else: 
      break 
    print (result) 

    stop = timeit.default_timer() 
    print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds")) 
+0

あなたがタイムアウトを指定した場合'join'関数は、それを返します。しかし、これは根本的な問題に対処しません。 – autodidacticon

+0

[python multiprocessing - 重複キューの処理でハングアップする可能性がある]の重複している可能性があります(http://stackoverflow.com/questions/21641887/python-multiprocessing-process-hangs-on-join-for-large-queue) – autodidacticon

答えて

1

は問題は起こりました。それ以外の場合、メインプロセスは "p.join()"で待機し、サブプロセスは "Queue.put"で待機しますが、キュー内のelemが多すぎるため、消費者は新しいelemを確保する余地がありません。 DOC hereとして

Bear in mind that a process that has put items in a queue will wait before terminating until 
all the buffered items are fed by the “feeder” thread to the underlying pipe 

だから、簡単な言葉では、あなたが前に "の部分を取得" コールする必要がある "(p.join)"。サブプロセスが動作する前に、メインプロセスが終了を心配している場合

は、あなたは少し下のようなコードを変更することがあります。

while True: 
    # check subprocess running before break 
    running = p.is_alive() 
    if not q.empty(): 
     result = np.append(result,q.get()) 
    else: 
     if not running: 
      break 

全体の一部は下記のお気に入り:

def Thread_Test(q, Sample_Size): 
    q.put(np.random.uniform(0,1,Sample_Size)) 


if __name__ == "__main__": 
    Sample_Size = 9000 
    q = Queue() 
    start = timeit.default_timer() 
    p = Process(target=Thread_Test,args=(q, Sample_Size,)) 
    p.daemon = True 
    p.start() 

    result = np.array([]) 
    while True: 
     running = p.is_alive() 
     if not q.empty(): 
      result = np.append(result,q.get()) 
     else: 
      if not running: 
       break 
    p.join() 
    stop = timeit.default_timer() 
    print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds")) 
+0

あなたの答え信じられないほど助けになりました、ありがとう! – Jon

+0

あなたは歓迎です:) – linpingta

関連する問題