私は、1行あたり1000万語の25Gbの平文ファイルを持っています。それぞれの行は個別に処理する必要があり、並行して処理されるダースの作業者にチャンクを分割しようとしています。現在のところ、一度に100万行をロードしています(何らかの理由で、ディスク上に〜3Gbしか圧縮されていないにもかかわらず10GbのRAMが必要です)。これを12通りに均等に分割し、マルチプロセッシングを使用して12人のワーカーにマッピングします。Pythonは大容量のファイルを並列処理する際にRAMを解放しません
問題は、自分の割り当てられたデータの処理が完了すると、RAMが解放されず、翌々々の100万回の繰り返しで〜10Gbが増えるだけです。
以前のデータを空の割り当てにリセットし、削除後にeval()、gc.collect()で繰り返し可能な変数名を作成し、そのIOを完全に分離してみましたすべて運がなく、まったく同じ問題はありません。デバッグを実行すると、Pythonインタプリタは予期されたデータのみを認識しており、以前の反復のデータにアクセスできないことを示しています。なぜRAMが実際に解放されていないのですか?
以下のコードは、すべての環境を分離しようとしている最新の反復ですが、最も効率的ではありませんが、「BigFileOnDisk」はSSD上にあるため、実際にデータを処理するのと比較して無視してください。従来は、割り当て機能内に「読み取り」機能があり、作業者が終了した後にすべてのデータが削除され、同じ結果が得られました。
def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)
def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True
def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}
for line in lines:
# process data
del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')
def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs)/workers)
subsection_start = end
inputs_split.append(inputs[start:end])
p = Pool(workers)
p.map(function_object, inputs_split)
コードを確認する必要があります。 – refi64
@ kirbyfan64sos好ましくは[mcve]です。 – jpmc26
コードが投稿されました – MKennedy