2017-03-03 16 views
2

私は次の問題に直面しています。ファイルを更新する関数を並列化しようとしていますが、OSError: [Errno 12] Cannot allocate memoryのためにPool()を開始できません。私はサーバーを見回し始めました。私は古い、弱いものを実際のメモリから使い分けているようなものではありません。 htopを参照してください: enter image description here そして、私はで動作するようにしようとしているファイルは、いずれかのその大きなではありません。また enter image description here を、free -mは、私は、スワップメモリ​​の〜7ギガバイトに加えて、利用可能なRAMをたくさん持って示しています。私はコード(およびスタックトレース)を下に貼り付けます。サイズは以下の通りです:Pythonマルチプロセッシング - OSErrorのデバッグ:[Errno 12]メモリを割り当てることができません

predictionmatrixデータフレームは約ca. 80MB(pandasdataframe.memory_usage()) ファイルgeo.geojsonは2MBです

これをデバッグするにはどうすればよいですか?どうすれば確認できますか?ヒント/トリックありがとう!

コード:

def parallelUpdateJSON(paramMatch, predictionmatrix, data): 
    for feature in data['features']: 
     currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] 
     if (len(currentfeature) > 0): 
      feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) 
     else: 
      feature['properties'].update({"style": {"opacity": 0}}) 

def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): 
    with open('geo.geojson') as f: 
     data = json.load(f) 
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
    pool = Pool() 
    func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
    pool.map(func, data) 
    pool.close() 
    pool.join() 

    with open('output.geojson', 'w') as outfile: 
     json.dump(data, outfile) 

スタックトレース:

global g_predictionmatrix 

def worker_init(predictionmatrix): 
    global g_predictionmatrix 
    g_predictionmatrix = predictionmatrix  

def parallelUpdateJSON(paramMatch, data_item): 
    for feature in data_item['features']: 
     currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] 
     if (len(currentfeature) > 0): 
      feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) 
     else: 
      feature['properties'].update({"style": {"opacity": 0}}) 

def use_the_pool(data, paramMatch, predictionmatrix): 
    pool = Pool(initializer=worker_init, initargs=(predictionmatrix,)) 
    func = partial(parallelUpdateJSON, paramMatch) 
    pool.map(func, data) 
    pool.close() 
    pool.join() 


def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): 
    with open('geo.geojson') as f: 
     data = json.load(f) 
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
    use_the_pool(data, paramMatch, predictionmatrix)  
    with open('trentino-grid.geojson', 'w') as outfile: 
     json.dump(data, outfile) 

--------------------------------------------------------------------------- 
OSError         Traceback (most recent call last) 
<ipython-input-428-d6121ed2750b> in <module>() 
----> 1 writeGeoJSON(6, 15, baseline) 

<ipython-input-427-973b7a5a8acc> in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix) 
    14  print("Start loop") 
    15  paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
---> 16  pool = Pool(2) 
    17  func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
    18  print(predictionmatrix.memory_usage()) 

/usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild) 
    116   from .pool import Pool 
    117   return Pool(processes, initializer, initargs, maxtasksperchild, 
--> 118      context=self.get_context()) 
    119 
    120  def RawValue(self, typecode_or_type, *args): 

/usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context) 
    166   self._processes = processes 
    167   self._pool = [] 
--> 168   self._repopulate_pool() 
    169 
    170   self._worker_handler = threading.Thread(

/usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self) 
    231    w.name = w.name.replace('Process', 'PoolWorker') 
    232    w.daemon = True 
--> 233    w.start() 
    234    util.debug('added worker') 
    235 

/usr/lib/python3.5/multiprocessing/process.py in start(self) 
    103    'daemonic processes are not allowed to have children' 
    104   _cleanup() 
--> 105   self._popen = self._Popen(self) 
    106   self._sentinel = self._popen.sentinel 
    107   _children.add(self) 

/usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj) 
    265   def _Popen(process_obj): 
    266    from .popen_fork import Popen 
--> 267    return Popen(process_obj) 
    268 
    269  class SpawnProcess(process.BaseProcess): 

/usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj) 
    18   sys.stderr.flush() 
    19   self.returncode = None 
---> 20   self._launch(process_obj) 
    21 
    22  def duplicate_for_child(self, fd): 

/usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj) 
    65   code = 1 
    66   parent_r, child_w = os.pipe() 
---> 67   self.pid = os.fork() 
    68   if self.pid == 0: 
    69    try: 

OSError: [Errno 12] Cannot allocate memory 

UPDATE

@のrobyschekのソリューションによると、私は自分のコードを更新しました210

そして、私はまだ同じエラーが発生します。また、documentationによれば、map()は私のdataをチャンクに分割しなければならないので、私は80MBの複製時間を複製する必要はないと思います。私は間違っているかもしれません... :) プラス私は小さな入力(〜11MBの代わりに80メガバイト)を使用する場合、私はエラーが表示されないことに気づいた。だから私はあまりにも多くのメモリを使用しようとしていると思いますが、80MBからRAMの16GBまでの処理方法を想像することはできません。

+0

申し訳ありませんが、私は、スタックトレースを読んで怠惰だったとエラーが 'OSで発生していることに気づいていませんでした.fork'。 また、私はマルチプロセッシングのソースを調べて、 私の 'predictionmatrix'の複製についての理論は、 ' Pool.imap'と 'chunkksize'を小文字にするだけで、Pool.mapはデフォルトでは影響を受けません。 私は自分の答えを削除しました。 – robyschek

答えて

1

私たちはこれを数回持っていました。私のsysadminによると、unixには "バグ"があります。これはメモリ不足の場合、プロセスが最大ファイル記述子の制限に達した場合に同じエラーを発生させます。

ファイル記述子のリークがあり、エラー発生は[Errno 12]です。メモリ#012OSErrorを割り当てることができません。問題は、あまりにも多くのFDの作成ではない場合

つまり、あなたのスクリプトとダブルチェックを見なければならない代わりに

3

multiprocessing.Poolを使用する場合、プロセスを開始するデフォルトの方法はforkです。 forkの問題は、プロセス全体が複製されていることです。 (see details here)。したがって、メインプロセスがすでに多くのメモリを使用している場合、このメモリは複製され、MemoryErrorに達します。たとえば、メインプロセスでメモリの2GBを使用し、8つのサブプロセスを使用する場合は、RAMに18GBが必要です。

あなたは、このような'forkserver''spawn'と異なるstartメソッドを使用してみてください。

from multiprocessing import set_start_method, Pool 
set_start_method('forkserver') 

# You can then start your Pool without each process 
# cloning your entire memory 
pool = Pool() 
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
pool.map(func, data) 

これらの方法は、あなたのProcessのワークスペースの重複を避けるいますが、モジュールにあなたをリロードする必要があるとして開始する少し遅くなることができます使っている。

+0

ありがとうございます、私はこれを見ていきますが、16GBのRAMが利用可能なシステムでは、100MB(または2ギガ)もあまり扱いすぎないと思っています。また、 'pool = Pool()'メソッドは、Pythonのドキュメントに従っても、マルチプロセッシングライブラリを使う方法です。 –

+0

私は私の答えを明確にしました。 startメソッドは、ここで 'MemoryError'を引き起こしている' fork'とは異なるメソッドを使用して、サブプロセスを起動するようにマルチプロセスに頼みます。 –

+0

現在、実際に使用されているすべてのUNIXライクなオペレーティングシステムでは、メモリ管理にcopy-on-writeがあります。つまり、同じメモリページはプロセス間で共有されます。プロセスがページ内の何かを変更するときだけプライベートコピーを取得します。 –

関連する問題