2016-05-15 14 views
1

私は基本的なマルチプロセッシングを実装しようとしていますが、問題が発生しました。 Pythonスクリプトは以下に添付されています。マルチプロセッシングQueue.get()がハングアップ

import time, sys, random, threading 
from multiprocessing import Process 
from Queue import Queue 
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency 

append_queue = Queue(10) 
database = FrequencyStore() 

def add_to_append_queue(_list): 
    append_queue.put(_list) 

def process_append_queue(): 
    while True: 
     item = append_queue.get() 
     database.append(item) 
     print("Appended to database in %.4f seconds" % database.append_time) 
     append_queue.task_done() 
    return 

def main(): 
    database.load_db() 
    print("Database loaded in %.4f seconds" % database.load_time) 
    append_queue_process = Process(target=process_append_queue) 
    append_queue_process.daemon = True 
    append_queue_process.start() 
    #t = threading.Thread(target=process_append_queue) 
    #t.daemon = True 
    #t.start() 

    while True: 
     path = raw_input("file: ") 
     if path == "exit": 
      break 
     a = AnalyzeFrequency(path) 
     a.analyze() 
     print("Analyzed file in %.4f seconds" % a._time) 
     add_to_append_queue(a.get_results()) 

    append_queue.join() 
    #append_queue_process.join() 
    database.save_db() 
    print("Database saved in %.4f seconds" % database.save_time) 
    sys.exit(0) 

if __name__=="__main__": 
    main() 

AnalyzeFrequencyは、ファイル内の単語の頻度を分析し、get_results()は言った言葉と周波数のソートされたリストを返します。リストは非常に大きく、おそらく10000アイテムです。

このリストは、add_to_append_queueメソッドに渡され、キューに追加されます。 process_append_queueはアイテムを1つずつ取り出し、その頻度を「データベース」に追加します。この操作は実際の分析よりも少し遅くなりますので、このメソッドに別のプロセスを使用しようとしています。スレッド化モジュールでこれを試してみると、すべて正常に動作し、エラーは発生しません。プロセスを試してみると、スクリプトはitem = append_queue.get()でハングします。

ここで何が起こっているのか説明してください

すべての回答が高く評価されました。

ピクルスのエラーは私のせいだった

UPDATE、それだけでタイプミスでした。今、私はマルチプロセス内でQueueクラスを使用していますが、append_queue.get()メソッドはまだハングします。 NEW CODE

import time, sys, random 
from multiprocessing import Process, Queue 
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency 

append_queue = Queue() 
database = FrequencyStore() 

def add_to_append_queue(_list): 
    append_queue.put(_list) 

def process_append_queue(): 
    while True: 
     database.append(append_queue.get()) 
     print("Appended to database in %.4f seconds" % database.append_time) 
    return 

def main(): 
    database.load_db() 
    print("Database loaded in %.4f seconds" % database.load_time) 
    append_queue_process = Process(target=process_append_queue) 
    append_queue_process.daemon = True 
    append_queue_process.start() 
    #t = threading.Thread(target=process_append_queue) 
    #t.daemon = True 
    #t.start() 

    while True: 
     path = raw_input("file: ") 
     if path == "exit": 
      break 
     a = AnalyzeFrequency(path) 
     a.analyze() 
     print("Analyzed file in %.4f seconds" % a._time) 
     add_to_append_queue(a.get_results()) 

    #append_queue.join() 
    #append_queue_process.join() 
    print str(append_queue.qsize()) 
    database.save_db() 
    print("Database saved in %.4f seconds" % database.save_time) 
    sys.exit(0) 

if __name__=="__main__": 
    main() 

UPDATE 2

これは、データベース・コードです:

class FrequencyStore: 

    def __init__(self): 
     self.sorter = Sorter() 
     self.db = {} 
     self.load_time = -1 
     self.save_time = -1 
     self.append_time = -1 
     self.sort_time = -1 

    def load_db(self): 
     start_time = time.time() 

     try: 
      file = open("results.txt", 'r') 
     except: 
      raise IOError 

     self.db = {} 
     for line in file: 
      word, count = line.strip("\n").split("=") 
      self.db[word] = int(count) 
     file.close() 

     self.load_time = time.time() - start_time 

    def save_db(self): 
     start_time = time.time() 

     _db = [] 
     for key in self.db: 
      _db.append([key, self.db[key]]) 
     _db = self.sort(_db) 

     try: 
      file = open("results.txt", 'w') 
     except: 
      raise IOError 

     file.truncate(0) 
     for x in _db: 
      file.write(x[0] + "=" + str(x[1]) + "\n") 
     file.close() 

     self.save_time = time.time() - start_time 

    def create_sorted_db(self): 
     _temp_db = [] 
     for key in self.db: 
      _temp_db.append([key, self.db[key]]) 
     _temp_db = self.sort(_temp_db) 
     _temp_db.reverse() 
     return _temp_db 

    def get_db(self): 
     return self.db 

    def sort(self, _list): 
     start_time = time.time() 

     _list = self.sorter.mergesort(_list) 
     _list.reverse() 

     self.sort_time = time.time() - start_time 
     return _list 

    def append(self, _list): 
     start_time = time.time() 

     for x in _list: 
      if x[0] not in self.db: 
       self.db[x[0]] = x[1] 
      else: 
       self.db[x[0]] += x[1] 

     self.append_time = time.time() - start_time 
+1

'Queue.Queue'はプロセス間で動作しません。だから最初の変更は 'multiprocessing.Queue'を代わりに使うことです。 –

答えて

2

コメントは、Windows上でこれを実行しようとしている示唆しています。 Windowsは はfork()を持っていないので、各プロセスは、独自のキューを取得し、彼らは何の関係も を持っていない - 私は、Windows上でこれを実行している場合、それは動作しないことができ、コメントに

を言ったようにお互いに。モジュール全体がWindowsの各プロセス によって「最初から」インポートされます。 main()、 にキューを作成し、それをワーカー関数の引数として渡す必要があります。

ここまでで説明した問題とは関係ないので、私はすべてのデータベースを削除しましたが、ここでは移植性を高めるために必要なことについて説明します。出力は「明白な」ものである

def process_append_queue(append_queue): 
    while True: 
     x = append_queue.get() 
     if x is None: 
      break 
     print("processed %d" % x) 
    print("worker done") 

def main(): 
    import multiprocessing as mp 

    append_queue = mp.Queue(10) 
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,)) 
    append_queue_process.start() 
    for i in range(100): 
     append_queue.put(i) 
    append_queue.put(None) # tell worker we're done 
    append_queue_process.join() 

if __name__=="__main__": 
    main() 

:それはきれいなものをシャットダウンしないようにする通常だけの怠惰な方法だと、多くの場合、後にあなたを噛まないように戻ってくるではないので、私はまた、daemonあいを削除しました:

processed 0 
processed 1 
processed 2 
processed 3 
processed 4 
... 
processed 96 
processed 97 
processed 98 
processed 99 
worker done 

注:Windowsがないためではない(できない)fork()、それは継承 Windows上の任意のPythonオブジェクトにワーカープロセスのために不可能です。各プロセスは、最初からプログラム全体を実行します。そのため、元のプログラムは機能しませんでした。各プロセスは独自のQueueを作成し、他のプロセスのQueueとはまったく関係ありません。上記のアプローチでは、メインプロセスのみがQueueを作成し、メインプロセスはそれを(引数として)ワーカープロセスに渡します。

+0

コードが機能します!もう一つ、FrequencyAnalysis.pyを見てみると、FrequencyStorageクラスが表示されます。別のプロセスからappendメソッドを呼び出すと、そのクラスのインスタンス変数は更新されません。私はあなたがキューで行ったように、オブジェクトとしてパラメータを渡しています。 – skyguy126

+0

これはまったく異なる質問ですので、テストケースを可能な限り小さくし、別の質問を投稿してください。一般的に、プロセス間でオブジェクトがどのような突然変異をしているかは予想できません。メモリは共有されません。 'multiprocessing.Queue'は、プロセス間で見える_make_突然変異まで、地面から実装されているため動作します。これは魔法によって起こることはありません(プロセス間の突然変異についての情報を通信するアンダーカバーのプロセスパイプによって起こりますセマフォーは同時突然変異に対して保護する)。 –

+0

お手数ですが、よろしくお願いします。 – skyguy126

2

queue.Queueはスレッドセーフですが、プロセス間では動作しません。しかし、これは修正するのが非常に簡単です。代わりに:

from multiprocessing import Process 
from Queue import Queue 

あなたが欲しい:

from multiprocessing import Process, Queue 
+1

私は盲目的なスクリーンリーダーのユーザーで、イメージ内のトレースバックを読み取ることはできません。また、後であなたのトレースバックのテキストのためにGoogleの人々を助けるものでもありません。質問を更新するか、コメント内にトレースバックを投稿するか、または新しいコメントを作成することが好ましい。 –

+0

元の質問に更新を掲載しました。 – skyguy126

+0

データベースはプロセスに対して安全ではありません。メインのPythonプロセスでデータベースインスタンスを作成してから、Pythonに完全に異なるインスタンスで何かを実行するように指示します。したがって、 'database.append'への呼び出しは' Queue.get'への呼び出しではなく、ハングしています。キューを使用することの全ポイントは、この正確な問題を回避することです。 –

関連する問題