私はマルチプロセッシングモジュールを使用して非常に大きなタスクを分割しています。それはほとんどの部分で機能しますが、私のデザインでは何かが明らかに欠けているはずです。これは、すべてのデータがいつ処理されたかを効果的に伝えることが難しいためです。マルチプロセッシング - プロデューサ/コンシューマデザイン
私は2つの別々のタスクを実行しています。もう一方は他の人に供給されます。私はこれが生産者/消費者の問題だと思います。私は、プロデューサがキューをいっぱいにし、コンシューマがキューから読み込んで処理を行う、すべてのプロセス間で共有キューを使用します。問題は有限の量のデータがあることです。したがって、ある時点では、すべてのデータが処理されたことを誰もが知っている必要があるため、システムを正常にシャットダウンできます。
map_async()関数を使用するのが理にかなっているようですが、プロデューサがキューをいっぱいにしているので、すべての項目がわからないのでwhileループに入ります。 apply_async()を使用して、何らかのタイムアウトですべてが完了したことを検出しようとする...醜い。
私は何かが明らかに欠けているように感じる。どのようにこれをよりうまく設計できますか?ここで
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
CONSUMER
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
MAIN
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
それは安っぽい取得する場所です。消費するリストが同時にいっぱいになっているので、マップを使用できません。だから私はwhileループに入り、タイムアウトを検出しようとする必要があります。 consumer_queueは、プロデューサがまだそれを埋めようとしている間に空になる可能性があるので、空のキューを検出するだけでは終了できません。
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
私はメインスレッド内のレコード)多分私は(得ることができると考え、代わりにキューを渡すの消費者にそれらを渡すが、私はそのように同じ問題で終わると思います。 whileループを実行してapply_async()を使用する必要があります。事前にアドバイスをありがとう!
これはうまくいくと思います。ありがとうございました!あなたの説明からjoin()がどのように動作するのかよく分かりませんが、私は方法を見つけたと思います。イベントをstart_producer_process()プロセスに渡し、すべてのプロデューサがconsumer_queueへの追加を完了した後にset()します。その時点(メインスレッドの後ろ)で、consumer_queueが空になると、すべてが処理されたので、whileループを安全に中断することができます。 – user1914881
混乱している部分は申し訳ありませんが、メインスレッドに参加すると、プロデューサが終了してコンシューマが作業を開始した直後にプログラムを終了することはありません。 – sean