2012-04-01 15 views
19

私はPythonのmultiprocessingモジュールを使用して、大量のnumpy配列を並列に処理しています。配列は、マスタプロセスでnumpy.load(mmap_mode='r')を使用してメモリマップされます。その後、multiprocessing.Pool()がプロセスをフォークします(私は推測します)。 <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>に中NumPyとマルチプロセッシングとmmap

を無視

はAttributeError( " 'NoneType' オブジェクトが属性を持っていない '伝える'"):

すべてが同様に私はラインを取得しています除き、正常に動作するようですunittestログ。それでもテストはうまくいく。

何が起こっているのでしょうか?

Python 2.7.2、OS X、NumPy 1.6.1を使用しています。


UPDATE:

いくつかのデバッグした後、私は(の小さなスライスに)Pool.imapコールへの入力として、このメモリマップnumpyのアレイを使用していたコードパスに原因を追い詰め。

「問題」は、multiprocessing.Pool.imapが入力を新しいプロセスに渡す方法であることが明らかです。それはpickleを使用します。これはmmap edpy配列では機能せず、エラーの原因となるブレーク内の何かが発生します。

私はthis replyで、同じ問題を解決すると思われるRobert Kernが見つかりました。彼は、imapの入力がメモリマップされた配列から来たときに特別なコードパスを作成することを提案しています。生成されたプロセスで同じ配列を手動でメモリマッピングします。

これは非常に複雑で醜いので、私はむしろエラーと余分なメモリコピーで暮らしています。既存のコードを変更する上で軽い他の方法はありますか?

答えて

22

通常の方法(余分なメモリコピーを使用できる場合)は、すべてのIOを1つのプロセスで実行してから、ワーカースレッドのプールに送信することです。 memmapされた配列のスライスをメモリにロードするには、x = np.array(data[yourslice])data[yourslice].copy()は実際にはこれを実行しませんが、混乱を招く可能性があります)。

まず第一に、のは、いくつかのテストデータを生成してみましょう:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

あなたはこのようなものを使用してエラーを再現できます

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

そして、あなただけの代わりにnp.array(data[start:stop])をもたらすに切り替えた場合、あなた'LL問題を修正してください:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

もちろん、各チャンクのインメモリコピー。

長期的には、memmappedファイルから切り離してHDFのようなものに移動する方が簡単です。これは、データが多次元である場合に特に当てはまります。 (私はh5pyをお勧めしますが、pyTablesはあなたのデータが "テーブルのような"ものならいいです。)

幸い、どんなレートでも!

+0

あなたの答えは常に揺れます。私はちょうどこれのようなものを理解しようとしてきました。 – YXD

+0

HDFチップをありがとう。広大な変化のように見えるが、価値があるかもしれないが、私はそれをチェックする。 – user124114

関連する問題