私は次のようにmultiprocessing.queues.JoinableQueueを使用しています。マルチプロセッシング.Queueは消え去っているようですか? OS(パイプ破壊)対Python?
非常に長時間実行されているスレッド(複数日間)がXML用のAPIをポーリングします。これを行うスレッドは、XMLをオブジェクトに単純に解析し、これらをキューに入れます。
各オブジェクトの処理には、XMLの解析よりもかなり時間がかかり、APIから読み取るスレッドには決して依存しません。このように、このマルチプロセッシングの実装は単純である。
プロセスを作成し、クリーンアップするためのコードはここにある:
def queueAdd(self, item):
try:
self.queue.put(item)
except AssertionError:
#queue has been closed, remake it (let the other GC)
logger.warn('Queue closed early.')
self.queue = BufferQueue(ctx=multiprocessing.get_context())
self.queue.put(item)
except BrokenPipeError:
#workaround for pipe issue
logger.warn('Broken pipe, Forcing creation of new queue.')
# all reading procesess should suicide and new ones spawned.
self.queue = BufferQueue(ctx=multiprocessing.get_context())
# address = 'localhost'
# if address in multiprocessing.managers.BaseProxy._address_to_local:
# del BaseProxy._address_to_local[address][0].connection
self.queue.put(item)
except Exception as e:
#general thread exception.
logger.error('Buffer queue exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
# check for finished consumers and clean them up before we check to see
# if we need to add additional consumers.
for csmr in self.running:
if not csmr.is_alive():
debug('Child dead, releasing.')
self.running.remove(csmr)
#see if we should start a consumer...
# TODO: add min/max processes (default and override)
if not self.running:
debug('Spawning consumer.')
new_consumer = self.consumer(
queue=self.queue,
results_queue=self.results_queue,
response_error=self.response_error)
new_consumer.start()
self.running.append(new_consumer)
消費者のプロセス制御ループは、同様に非常に単純です:
def run(self):
'''Consumes the queue in the framework, passing off each item to the
ItemHandler method.
'''
while True:
try:
item = self.queue.get(timeout=3)
#the base class just logs this stuff
rval = self.singleItemHandler(item)
self.queue.task_done()
if rval and self.results_queue:
self.results_queue.put(rval)
except queue.Empty:
logging.debug('Queue timed out after 3 seconds.')
break
except EOFError:
logging.info(
'%s has finished consuming queue.' % (__class__.__name__))
break
except Exception as e:
#general thread exception.
self.logger.error('Consumer exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
しばらくして(成功した処理の時間について)、消費者プロセスがDEBUG:root:Queue timed out after 3 seconds.
のタイムアウトのために死んだことを示すログメッセージが表示されますが、キューはまだ開いており、明らかに元のスレッドによって書き込まれています。このスレッドは、コンシューマプロセスが終了したとは思わない(queueAddメソッドを参照)。新しいスレッドを開始しようとしない。キューは空ではないように見えますが、キューからの読み取りはタイムアウトしたようです。
私はマネージャーが子供がまだ生きていると思う理由のために紛失しています。
編集
私はBrokenPipeErrorが切断された接続のクリーンアップを削除するだけでなく、ログに記録される方法へのコード変更に伴う元の質問を変更しました。それは別の問題だと私は考えている。
慎重にこの問題を検討している間、私は私がここで説明BrokenPipe例外のためにログ出力を持っていなかったことに気づい:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach
上記のコメントに記載されているように追加のログを追加しましたが、別のテストでBrokenPipeErrorに関するログメッセージが得られませんでした。 – SkyLeach