2012-12-20 2 views
6

私はマルチプロセッシングモジュールを使用して非常に大きなタスクを分割しています。それはほとんどの部分で機能しますが、私のデザインでは何かが明らかに欠けているはずです。これは、すべてのデータがいつ処理されたかを効果的に伝えることが難しいためです。マルチプロセッシング - プロデューサ/コンシューマデザイン

私は2つの別々のタスクを実行しています。もう一方は他の人に供給されます。私はこれが生産者/消費者の問題だと思います。私は、プロデューサがキューをいっぱいにし、コンシューマがキューから読み込んで処理を行う、すべてのプロセス間で共有キューを使用します。問題は有限の量のデータがあることです。したがって、ある時点では、すべてのデータが処理されたことを誰もが知っている必要があるため、システムを正常にシャットダウンできます。

map_async()関数を使用するのが理にかなっているようですが、プロデューサがキューをいっぱいにしているので、すべての項目がわからないのでwhileループに入ります。 apply_async()を使用して、何らかのタイムアウトですべてが完了したことを検出しようとする...醜い。

私は何かが明らかに欠けているように感じる。どのようにこれをよりうまく設計できますか?ここで

PRODCUER

class ProducerProcess(multiprocessing.Process): 
    def __init__(self, item, consumer_queue): 
     self.item = item 
     self.consumer_queue = consumer_queue 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     for record in get_records_for_item(self.item): # this takes time 
      self.consumer_queue.put(record) 

def start_producer_processes(producer_queue, consumer_queue, max_running): 
    running = [] 

    while not producer_queue.empty(): 
     running = [r for r in running if r.is_alive()] 
     if len(running) < max_running: 
      producer_item = producer_queue.get() 
      p = ProducerProcess(producer_item, consumer_queue) 
      p.start() 
      running.append(p) 
     time.sleep(1) 

CONSUMER

def process_consumer_chunk(queue, chunksize=10000): 
    for i in xrange(0, chunksize): 
     try: 
      # don't wait too long for an item 
      # if new records don't arrive in 10 seconds, process what you have 
      # and let the next process pick up more items. 

      record = queue.get(True, 10) 
     except Queue.Empty:     
      break 

     do_stuff_with_record(record) 

MAIN

if __name__ == "__main__": 
    manager = multiprocessing.Manager() 
    consumer_queue = manager.Queue(1024*1024) 
    producer_queue = manager.Queue() 

    producer_items = xrange(0,10) 

    for item in producer_items: 
     producer_queue.put(item) 

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) 
    p.start() 

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

それは安っぽい取得する場所です。消費するリストが同時にいっぱいになっているので、マップを使用できません。だから私はwhileループに入り、タイムアウトを検出しようとする必要があります。 consumer_queueは、プロデューサがまだそれを埋めようとしている間に空になる可能性があるので、空のキューを検出するだけでは終了できません。

timed_out = False 
    timeout= 1800 
    while 1: 
     try: 
      result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,)) 
      if timed_out: 
       timed_out = False 

     except Queue.Empty: 
      if timed_out: 
       break 

      timed_out = True 
      time.sleep(timeout) 
     time.sleep(1) 

    consumer_queue.join() 
    consumer_pool.close() 
    consumer_pool.join() 

私はメインスレッド内のレコード)多分私は(得ることができると考え、代わりにキューを渡すの消費者にそれらを渡すが、私はそのように同じ問題で終わると思います。 whileループを実行してapply_async()を使用する必要があります。事前にアドバイスをありがとう!

答えて

2

manager.Eventを使用して作業の終了を知らせることができます。このイベントは、すべてのプロセス間で共有することができます。その後、メインプロセスから通知すると、他のワーカーは正常にシャットダウンできます。

while not event.is_set(): 
...rest of code... 

したがって、コンシューマーは、イベントが設定されるのを待って、設定されるとクリーンアップを処理します。

このフラグをいつ設定するかを決定するには、プロデューサスレッドでjoinを実行し、すべてが完了したらコンシューマスレッドに参加できます。

+0

これはうまくいくと思います。ありがとうございました!あなたの説明からjoin()がどのように動作するのかよく分かりませんが、私は方法を見つけたと思います。イベントをstart_producer_process()プロセスに渡し、すべてのプロデューサがconsumer_queueへの追加を完了した後にset()します。その時点(メインスレッドの後ろ)で、consumer_queueが空になると、すべてが処理されたので、whileループを安全に中断することができます。 – user1914881

+0

混乱している部分は申し訳ありませんが、メインスレッドに参加すると、プロデューサが終了してコンシューマが作業を開始した直後にプログラムを終了することはありません。 – sean

0

マルチプロセス/スレッド化の代わりにSimPyをお勧めします。離散イベントシミュレーション。

関連する問題