2012-01-03 8 views
15

非常に大きなcsvファイル(64MBから500MB)を取り込み、行ごとに作業し、小さな固定サイズの ファイルを出力するマルチプロセッシングを使用して、アプリケーションを並列化しようとしています。大きなファイルからマルチプロセッシングのデータをチャンクする?

現在、私は残念ながら、メモリに完全に ロードされlist(file_obj)を行う(私は思う)、私はその後、私は、n個の部分にそのリストを破る、nは私が実行したいプロセスの 数です。次に、 リストの上にpool.map()を作成します。

これは、単一の スレッド、ちょうどオープン・ファイル・アンド・イテレート・オーバー・イット・メソドロジと比較して、本当に悪いランタイムを持つようです。誰か より良い解決策を提案できますか?

さらに、特定の列の値を に保存するグループ内のファイルの行を処理する必要があります。これらの行グループは、それ自体で分割することもできますが、 グループはこの列に複数の値を含める必要がありません。

答えて

14

list(file_obj)は、fileobjが大きい場合に多くのメモリを必要とする可能性があります。私たちは、必要なときに塊を引き出すために、itertoolsを使ってメモリ要件を減らすことができます。特に

、我々は一度にnum_chunksチャンクのマルチプロセッシング・プールの仕事を持つように加工可能なチャンクにファイルを分割し、

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] 
result = pool.map(worker, groups) 

する

reader = csv.reader(f) 
chunks = itertools.groupby(reader, keyfunc) 

を使用することができます。

こうすることで、ファイル全体ではなく、メモリ内に少数のチャンク(num_chunks)チャンクを保持するだけのメモリが必要になります。


import multiprocessing as mp 
import itertools 
import time 
import csv 

def worker(chunk): 
    # `chunk` will be a list of CSV rows all with the same name column 
    # replace this with your real computation 
    # print(chunk) 
    return len(chunk) 

def keyfunc(row): 
    # `row` is one row of the CSV file. 
    # replace this with the name column. 
    return row[0] 

def main(): 
    pool = mp.Pool() 
    largefile = 'test.dat' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 
    main() 
私は線が相互にされていないと述べた場合、私は嘘をつい
+0

- CSVには、(名前列で分割する必要がある列があり、その名前を持つすべての行ができません分裂する)。しかし、私はこれをこの基準でグループ化することができます。ありがとう!私はitertoolsについて何も知りませんでした。 – user1040625

+0

元のコードにエラーがありました。 'pool.apply_async'へのすべての呼び出しは非ブロックであるため、ファイル全体が一度にキューに入れられていました。これはメモリ節約をもたらさなかったであろう。だから私は一度に 'num_chunks'を待ち行列に入れるためにループを少し変更しました。 'pool.map'への呼び出しはブロックされており、ファイル全体が一度にキューに入れられることはありません。 – unutbu

+0

@HappyLeapSecondユーザーがここでメソッドを実装しようとしていますhttp://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiprocessingそして問題を抱えています。おそらくあなたは助けることができますか? – m0meni

1

私はそれを簡単に保つでしょう。 1つのプログラムでファイルを開き、1行ずつ読み込みます。いくつのファイルを分割するかを選択し、その多くの出力ファイルを開き、すべての行が次のファイルに書き込むことができます。これによりファイルはnに分割されます。次に、それぞれのファイルに対してPythonプログラムを並列に実行することができます。

関連する問題