2016-07-19 15 views
4

私は、1行あたり1000万語の25Gbの平文ファイルを持っています。それぞれの行は個別に処理する必要があり、並行して処理されるダースの作業者にチャンクを分割しようとしています。現在のところ、一度に100万行をロードしています(何らかの理由で、ディスク上に〜3Gbしか圧縮されていないにもかかわらず10GbのRAMが必要です)。これを12通りに均等に分割し、マルチプロセッシングを使用して12人のワーカーにマッピングします。Pythonは大容量のファイルを並列処理する際にRAMを解放しません

問題は、自分の割り当てられたデータの処理が完了すると、RAMが解放されず、翌々々の100万回の繰り返しで〜10Gbが増えるだけです。

以前のデータを空の割り当てにリセットし、削除後にeval()、gc.collect()で繰り返し可能な変数名を作成し、そのIOを完全に分離してみましたすべて運がなく、まったく同じ問題はありません。デバッグを実行すると、Pythonインタプリタは予期されたデータのみを認識しており、以前の反復のデータにアクセスできないことを示しています。なぜRAMが実際に解放されていないのですか?

以下のコードは、すべての環境を分離しようとしている最新の反復ですが、最も効率的ではありませんが、「BigFileOnDisk」はSSD上にあるため、実際にデータを処理するのと比較して無視してください。従来は、割り当て機能内に「読み取り」機能があり、作業者が終了した後にすべてのデータが削除され、同じ結果が得られました。

def allocation(): 
    fileCompleted = False 
    currentLine = 0 
    while not fileCompleted: 
     lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine) 
     list_of_values(function_object=worker, inputs=lineData, workers=12) 


def read(numLines, startLine=0): 
    currentLine = 0 
    lines = [] 
    with open(BigFileOnDisk, 'r') as fid: 
     for line in fid: 
      if currentLine >= startLine: 
       lines.append(line) 
      if currentLine - startLine >= numLines: 
       return lines, counter, False 
      currentLine += 1 
     # or if we've hit the end of the file 
     return lines, counter, True 


def worker(lines): 
    outputPath = *root* + str(datetime.datetime.now().time()) 
    processedData = {} 

    for line in lines: 
     # process data 

    del lines 
    with open(outputPath, 'a') as fid: 
     for item in processedData: 
      fid.write(str(item) + ', ' + str(processedData[item]) + '\n') 


def list_of_values(function_object, inputs, workers = 10): 
    inputs_split = [] 
    subsection_start = 0 
    for n in range(workers): 
     start = int(subsection_start) 
     end = int(subsection_start + len(inputs)/workers) 
     subsection_start = end 

     inputs_split.append(inputs[start:end]) 

    p = Pool(workers) 
    p.map(function_object, inputs_split) 
+0

コードを確認する必要があります。 – refi64

+0

@ kirbyfan64sos好ましくは[mcve]です。 – jpmc26

+0

コードが投稿されました – MKennedy

答えて

4

サブプロセスに参加していません。 list_of_valuesの後にPoolによって作成されたプロセスがまだ生きています(ちょっとゾンビのようですが、生きている親プロセスがあります)。彼らはまだすべての価値を保持しています。他のプロセスで同じデータが表示されているため(同じ理由でgc.collectが機能していないため)、メインのデータは表示されません。

ワーカーによって割り当てられたメモリを解放するには、手動でPoolに参加するか、withを手動で参加させる必要があります。

def list_of_values(function_object, inputs, workers = 10): 
    inputs_split = [] 
    subsection_start = 0 
    for n in range(workers): 
     start = int(subsection_start) 
     end = int(subsection_start + len(inputs)/workers) 
     subsection_start = end 

     inputs_split.append(inputs[start:end]) 

    with Pool(workers) as p: 
     p.map(function_object, inputs_split) 
+0

ああ、ありがとう! – MKennedy

関連する問題