2016-07-28 5 views
1

私は次のようにmultiprocessing.queues.JoinableQueueを使用しています。マルチプロセッシング.Queueは消え去っているようですか? OS(パイプ破壊)対Python?

非常に長時間実行されているスレッド(複数日間)がXML用のAPIをポーリングします。これを行うスレッドは、XMLをオブジェクトに単純に解析し、これらをキューに入れます。

各オブジェクトの処理には、XMLの解析よりもかなり時間がかかり、APIから読み取るスレッドには決して依存しません。このように、このマルチプロセッシングの実装は単純である。

プロセスを作成し、クリーンアップするためのコードはここにある:

def queueAdd(self, item): 
     try: 
      self.queue.put(item) 
     except AssertionError: 
      #queue has been closed, remake it (let the other GC) 
      logger.warn('Queue closed early.') 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
      self.queue.put(item) 
     except BrokenPipeError: 
      #workaround for pipe issue 
      logger.warn('Broken pipe, Forcing creation of new queue.') 
      # all reading procesess should suicide and new ones spawned. 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
#    address = 'localhost' 
#    if address in multiprocessing.managers.BaseProxy._address_to_local: 
#     del BaseProxy._address_to_local[address][0].connection 
      self.queue.put(item) 
     except Exception as e: 
      #general thread exception. 
      logger.error('Buffer queue exception %s' % e) 
      #TODO: continue trying/trap exceptions? 
      raise 
     # check for finished consumers and clean them up before we check to see 
     # if we need to add additional consumers. 
     for csmr in self.running: 
      if not csmr.is_alive(): 
       debug('Child dead, releasing.') 
       self.running.remove(csmr) 

     #see if we should start a consumer... 
     # TODO: add min/max processes (default and override) 
     if not self.running: 
      debug('Spawning consumer.') 
      new_consumer = self.consumer(
        queue=self.queue, 
        results_queue=self.results_queue, 
        response_error=self.response_error) 
      new_consumer.start() 
      self.running.append(new_consumer) 

消費者のプロセス制御ループは、同様に非常に単純です:

def run(self): 
     '''Consumes the queue in the framework, passing off each item to the 
     ItemHandler method. 
     ''' 
     while True: 
      try: 
       item = self.queue.get(timeout=3) 
       #the base class just logs this stuff 
       rval = self.singleItemHandler(item) 
       self.queue.task_done() 
       if rval and self.results_queue: 
        self.results_queue.put(rval) 
      except queue.Empty: 
       logging.debug('Queue timed out after 3 seconds.') 
       break 
      except EOFError: 
       logging.info(
        '%s has finished consuming queue.' % (__class__.__name__)) 
       break 
      except Exception as e: 
       #general thread exception. 
       self.logger.error('Consumer exception %s' % e) 
       #TODO: continue trying/trap exceptions? 
       raise 

しばらくして(成功した処理の時間について)、消費者プロセスがDEBUG:root:Queue timed out after 3 seconds.のタイムアウトのために死んだことを示すログメッセージが表示されますが、キューはまだ開いており、明らかに元のスレッドによって書き込まれています。このスレッドは、コンシューマプロセスが終了したとは思わない(queueAddメソッドを参照)。新しいスレッドを開始しようとしない。キューは空ではないように見えますが、キューからの読み取りはタイムアウトしたようです。

私はマネージャーが子供がまだ生きていると思う理由のために紛失しています。

編集


私はBrokenPipeErrorが切断された接続のクリーンアップを削除するだけでなく、ログに記録される方法へのコード変更に伴う元の質問を変更しました。それは別の問題だと私は考えている。

+0

慎重にこの問題を検討している間、私は私がここで説明BrokenPipe例外のためにログ出力を持っていなかったことに気づい:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach

+0

上記のコメントに記載されているように追加のログを追加しましたが、別のテストでBrokenPipeErrorに関するログメッセージが得られませんでした。 – SkyLeach

答えて

0

問題は、マルチプロセッシングで微妙​​な現実が原因です。 queue.putを呼び出したANYプロセスは、名前付きパイプに書き込むバックグラウンドスレッドを実行します。

私の場合、結果キューに何らかの理由で処理できなかった項目がかなりの量ではありませんが、それでもパイプを満たして原因を突き止めるには十分でした実行していないにもかかわらず消費者が終了しないようにする。これは、書き込みキューがゆっくりといっぱいになることにつながります。

解決策は、すべての結果が取得されることを保証する最後の(ブロッキング)呼び出しを除いて、これまで使用可能なすべての結果を読み取るために、api呼び出しの次の反復のためにノンブロッキング呼び出しを変更したことです。

def finish(self, block=True, **kwargs): 
    ''' 
    Notifies the buffer that we are done filling it. 
    This command binds to any processes still running and lets them 
    finish and then copies and flushes the managed results list. 
    ''' 
    # close the queue and wait until it is consumed 
    if block: 
     self.queue.close() 
     self.queue.join_thread() 
     # make sure the consumers are done consuming the queue 
     for csmr in self.running: 
      #get everything on the results queue right now. 
      try: 
       while csmr.is_alive(): 
        self.results_list.append(
         self.results_queue.get(timeout=0.5)) 
        self.results_queue.task_done() 
      except queue.Empty: 
       if csmr.is_alive(): 
        logger.warn('Result queue empty but consumer alive.') 
        logger.warn('joining %s.' % csmr.name) 
        csmr.join() 
     del self.running[:] 
     if self.callback: 
      return self.callback(self.results_list) 
    else: 
     #read results immediately available. 
     try: 
      while True: 
       self.results_list.append(self.results_queue.get_nowait()) 
       self.results_queue.task_done() 
     except queue.Empty: 
      #got everything on the queue so far 
      pass 
    return self.results_list 
関連する問題