0
私は基本的な質問をしている場合は私の前にmultiprocessing
と一緒に働いたことがありません。進捗状況のマルチプロセッシング
This answer私は自分のニーズに適応した非常に素晴らしい処理クラスを提供しました。私はprint
ステートメントを使用してテストしている基本的なプログレスバーを実装しようとしていますが、全く動作していません(出力はありません)。私の現在のコードはこれです
:いくつかのテストの後
class ParsingMaster(object):
def __init__(self, parser, input_file, output_file):
self.parser = parser
self.num_processes = cpu_count()
self.input_file = input_file
self.output_file = output_file
self.input_queue = Queue()
self.output_queue = Queue()
self.input_size = 0
self.input_process = Process(target=self.parse_input)
self.output_process = Process(target=self.write_output)
self.processes = [Process(target=self.process_row) for row in range(self.num_processes)]
self.input_process.start()
self.output_process.start()
for process in self.processes:
process.start()
self.input_process.join()
for process in self.processes:
process.join()
self.output_process.join()
def parse_input(self):
for index, row in enumerate(self.input_file):
self.input_queue.put([index, row])
self.input_size = self.input_queue.qsize()
for i in range(self.num_processes):
self.input_queue.put('STOP')
def process_row(self):
for index, row in iter(self.input_queue.get, 'STOP'):
self.output_queue.put([index, row[0], self.parser.parse(row[1])])
self.output_queue.put('STOP')
def write_output(self):
current = 0
buffer = {}
for works in range(self.num_processes):
for index, id, row in iter(self.output_queue.get, 'STOP'):
if index != current:
buffer[index] = [id] + row
else:
self.output_file.writerow([id] + row)
current += 1
while current in buffer:
self.output_file.writerow(buffer[current])
del buffer[current]
current += 1
if self.input_size:
print float(current * 100)/float(self.input_size)
、私はいくつかの奇妙なものを発見しました:
self.input_size
はparse_input()
で正しく更新されます。parse_input()
が終了し、write_output()
がまだ実行中です。write_output()
常には、self.input_size = 0
と報告しています。
ここでどこが間違っているのか教えていただけますか?助けてください、事前にありがとうございます。
ありがとうございました。私は1時間ほど前にそれを理解することができたが、正しい方向に進んでいることを知ってもいい。 – Blender