2012-03-09 20 views
0

私は基本的な質問をしている場合は私の前にmultiprocessingと一緒に働いたことがありません。進捗状況のマルチプロセッシング

This answer私は自分のニーズに適応した非常に素晴らしい処理クラスを提供しました。私はprintステートメントを使用してテストしている基本的なプログレスバーを実装しようとしていますが、全く動作していません(出力はありません)。私の現在のコードはこれです

:いくつかのテストの後

class ParsingMaster(object): 
    def __init__(self, parser, input_file, output_file): 
    self.parser = parser 

    self.num_processes = cpu_count() 
    self.input_file = input_file 
    self.output_file = output_file 

    self.input_queue = Queue() 
    self.output_queue = Queue() 

    self.input_size = 0 

    self.input_process = Process(target=self.parse_input) 
    self.output_process = Process(target=self.write_output) 
    self.processes = [Process(target=self.process_row) for row in range(self.num_processes)] 

    self.input_process.start() 
    self.output_process.start() 

    for process in self.processes: 
     process.start() 

    self.input_process.join() 

    for process in self.processes: 
     process.join() 

    self.output_process.join() 

    def parse_input(self): 
    for index, row in enumerate(self.input_file): 
     self.input_queue.put([index, row]) 
     self.input_size = self.input_queue.qsize() 

    for i in range(self.num_processes): 
     self.input_queue.put('STOP') 

    def process_row(self): 
    for index, row in iter(self.input_queue.get, 'STOP'): 
     self.output_queue.put([index, row[0], self.parser.parse(row[1])]) 

    self.output_queue.put('STOP') 

    def write_output(self): 
    current = 0 
    buffer = {} 

    for works in range(self.num_processes): 
     for index, id, row in iter(self.output_queue.get, 'STOP'): 
     if index != current: 
      buffer[index] = [id] + row 
     else: 
      self.output_file.writerow([id] + row) 
      current += 1 

      while current in buffer: 
      self.output_file.writerow(buffer[current]) 
      del buffer[current] 
      current += 1 

      if self.input_size: 
       print float(current * 100)/float(self.input_size) 

、私はいくつかの奇妙なものを発見しました:

  • self.input_sizeparse_input()で正しく更新されます。
  • parse_input()が終了し、write_output()がまだ実行中です。
  • write_output()常には、self.input_size = 0と報告しています。

ここでどこが間違っているのか教えていただけますか?助けてください、事前にありがとうございます。

答えて

2

self.input_sizeはプロセスローカル変数です。各プロセスには独自のコピーがあります。 multiprocessing documentationによれば、データを共有するには、ValueArrayのようなコンテナにデータをラップする必要があります。

+0

ありがとうございました。私は1時間ほど前にそれを理解することができたが、正しい方向に進んでいることを知ってもいい。 – Blender

関連する問題