list(file_obj)
は、fileobj
が大きい場合に多くのメモリを必要とする可能性があります。私たちは、必要なときに塊を引き出すために、itertoolsを使ってメモリ要件を減らすことができます。特に
、我々は一度にnum_chunks
チャンクのマルチプロセッシング・プールの仕事を持つように加工可能なチャンクにファイルを分割し、
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
する
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
を使用することができます。
こうすることで、ファイル全体ではなく、メモリ内に少数のチャンク(num_chunks
)チャンクを保持するだけのメモリが必要になります。
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
私は線が相互にされていないと述べた場合、私は嘘をつい
- CSVには、(名前列で分割する必要がある列があり、その名前を持つすべての行ができません分裂する)。しかし、私はこれをこの基準でグループ化することができます。ありがとう!私はitertoolsについて何も知りませんでした。 – user1040625
元のコードにエラーがありました。 'pool.apply_async'へのすべての呼び出しは非ブロックであるため、ファイル全体が一度にキューに入れられていました。これはメモリ節約をもたらさなかったであろう。だから私は一度に 'num_chunks'を待ち行列に入れるためにループを少し変更しました。 'pool.map'への呼び出しはブロックされており、ファイル全体が一度にキューに入れられることはありません。 – unutbu
@HappyLeapSecondユーザーがここでメソッドを実装しようとしていますhttp://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiprocessingそして問題を抱えています。おそらくあなたは助けることができますか? – m0meni