0

私は並列計算を実装するためにマスタースレーブ構造を使用しています。単一のマスタプロセス(0)がデータをロードし、大きなオブジェクトを使用してスレーブプロセス(1 - N)に重複して関連するチャンクと命令を配布します... blah blah blah。問題はメモリ使用量です。これは各スレーブプロセスでresource.getrusage(resource.RUSAGE_SELF).ru_maxrssを使用して監視しています。Pythonの並列プロセスループでメモリを解放する

最初のタスクは約6GBのメモリを使用しますが、スレーブが2番目のタスクを受信すると、前のメモリが収集されていないかのように最大10GBまで膨張します。私の理解は、変数がその参照を失うとすぐに(_gwb変数がリセットされたときの下のコードで)ガベージコレクションは家をきれいにするはずです。なぜこれは起こっていないのですか?

各ループの最後にdel _gwbを投げてもらえますか?
gc.collect()への手動呼び出しはどうですか?
subprocess esをdescribed in this answerとして生成する必要がありますか?

SLURM管理対象クラスタでmpi4pyを使用しています。

for jj, tt in enumerate(times): 

    for ii, sim in enumerate(sims): 

     search = True 
     # Find a slave to give this task to 
     while search: 
      # Repackage HDF5 data into dictionary to work with MPI 
      sim_dat = ... # load some data 

      # Look for available slave process 
      data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) 
      src = stat.Get_source() 

      # Store Results 
      if tag == TAGS.DONE: 
       _store_slave_results(data, ...) 
       num_done += 1 
      elif tag == TAGS.READY: 
       # Distribute tasks 
       comm.send(sim_data, dest=src, tag=TAGS.START) 
       # Stop searching, move to next task 
       search = False 

      cycles += 1 

そして奴隷

マスタープロセスは次のようになります

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     _gwb = Large_Data_Structure(task) 
     data = _gwb.do_heavy_lifting(task) 
     comm.send(data, dest=0, tag=TAGS.DONE) 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 

編集:いくつかの他の奇妙な機微(場合には、彼らは、関連するかもしれません) :
1)いくつかのプロセスだけがメモリが増えていることを示します。 rはほぼ同じままです。アクティブ
2)メモリの特定の量は、彼らが必ずしも同じコードを実行する必要があるにもかかわらず... 100s of MBによって異なる(異なるスレーブ・プロセスに異なるです!

答えて

1

del _gwb大きな違いを作る必要があります。_gwb = Large_Data_Structure(task)付き新しいデータが生成されて_gwdに代入され、古いデータが解放されます。特定のdelは、オブジェクトを早期に取り除きます。次の割り当てがまったく同じ数のメモリを獲得するとは言いません。

ガベージコレクタは、通常の参照カウントではメモリを解放するのに十分でない場合にのみ機能します。 do_heavy_liftingがファンキーなことを何もしていないとすれば、違いはありません。

あなたはsubprocessと言います... linuxシステムの別のオプションはos.forkです。子プロセスは、親アドレス空間のコピーオンライトビューを取得します。大きなオブジェクトは子メモリに生成され、終了時には消えます。私はこれが動作することを保証することはできませんが、興味深い実験になります。

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     pid = os.fork() 
     if pid: 
      # parent waits for child 
      os.waitpid(pid) 
     else: 
      # child does work, sends results and exits 
      _gwb = Large_Data_Structure(task) 
      data = _gwb.do_heavy_lifting(task) 
      comm.send(data, dest=0, tag=TAGS.DONE) 
      os._exit() 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 
+0

ありがとうございます!私はこれを試してみましょう。いくつかのプロセスだけがメモリの成長を示し、他のプロセスはおおよそ同じ状態に留まります。 2)特定のメモリ量は異なるスレーブプロセス上で異なっていますが、必ず同じコードを実行する必要があります。 – DilithiumMatrix

+0

私はそのような理由を言うことはできません。計算されたデータは、入力パラメータに敏感な場合があります(たとえば、範囲(count)が1か10000000かによって異なります)。ちょうど推測。 – tdelaney

+0

ええと、私はプロセス間でデータサイズが変わる場所を見つけることができません。 'del_gwb'で試してみたところ、メモリの挙動に変化はありませんでした。' _gwb'オブジェクトは外部オブジェクトへの参照をいくつか保存しています。 '_gwb = Large_Data_Structure(task、other_obj)'と '_gwb'のコンストラクタで'(self.other = other_obj) '...メモリを収集しないようにすることができますか? – DilithiumMatrix

関連する問題