2016-03-08 21 views
5

は、私はこれに似ている、という問題がある。Pythonのマルチプロセッシングと共有numpyの配列

import numpy as np 

C = np.zeros((100,10)) 

for i in range(10): 
    C_sub = get_sub_matrix_C(i, other_args) # shape 10x10 
    C[i*10:(i+1)*10,:10] = C_sub 

ので、どうやら各部分行列を独立に計算することができるので、シリアル計算としてこれを実行する必要はありません。 マルチプロセッシングモジュールを使用して、forループのプロセスを4つまで作成したいと思います。 私はマルチプロセッシングに関するいくつかのチュートリアルを読んだが、これを使って自分の問題を解決する方法を理解することはできなかった。あなたの助けのための

おかげ

+2

:あなたは、プレーンmapstarmapを置き換えることができますが、その後、あなたは、単一のパラメータを取る関数を提供する必要があります。マルチプロセッシングはデータをシリアル化*してサブプロセスに送信し、それをデシリアライズして計算を実行し、結果をシリアル化してメインプロセスに戻し、最後にデシリアライズします。シリアライゼーション/デシリアライゼーションにかなりの時間がかかり、プロセス間の通信もそれほど高速ではありません。 'get_sub_matrix'が文字通りわずかな行列アクセスであるなら、あなたはどんな高速化も得られません。 – Bakuriu

+0

これは説明のためのものです。最終的に私の行列はおよそ100000 x 20000の大きさになりますが、get_sub_matrix_Cのほうが遅いのはもっと重要ですし、私はその関数をもっと速くすることはできません。 – RoSt

+0

get_sub_matrix_Cはすべてのマトリックスまたはサブマトリックスにアクセスする必要がありますか?なぜなら、すべてが必要な場合、各サブプロセスのビッグ・マトリックスの1つのコピーの直列化は、非常に時間がかかり、メモリーを消費するからです。 – eguaio

答えて

4

そのコードを並列化する簡単な方法は、プロセスのPoolを使用することです:

pool = multiprocessing.Pool() 
results = pool.starmap(get_sub_matrix_C, ((i, other_args) for i in range(10))) 

for i, res in enumerate(results): 
    C[i*10:(i+1)*10,:10] = res 

get_sub_matrix_C関数は複数の引数(starmap(f, [(x1, ..., xN)])呼び出しf(x1, ..., xN))を持っているので、私はstarmapを使用しました。

ただし、シリアライズ/デシリアライゼーションにはかなりの時間がかかります(および)ので、オーバーヘッドを避けるために、より低レベルのソリューションを使用する必要があります。


古くなったバージョンのPythonを実行しているようです。 **かなりの時間を取る必要があります**性能向上に計算を得るために、マルチプロセッシングためには

def f(args): 
    return get_sub_matrix_C(*args) 

pool = multiprocessing.Pool() 
results = pool.map(f, ((i, other_args) for i in range(10))) 

for i, res in enumerate(results): 
    C[i*10:(i+1)*10,:10] = res 
+0

あなたの答えをありがとう。残念ながら私はそれをテストすることはできません。なぜなら、私はストラメアを持っていないからです。おそらく私は古いバージョンのマルチプロセッシングを使用していますか?バージョン:0.70a1 – RoSt

+0

@RoSt 'map'を使用して、関数を修正して単一のパラメータを受け入れることができます。この解決法を追加する答えを編集しました。 – Bakuriu

+0

簡単で簡単な解決法をお寄せいただきありがとうございます。それはうまく動作します。私はあなたに投票しますが、自分の評判は<15です、申し訳ありません... – RoSt

0

以下のレシピは、おそらく仕事をすることができます。お気軽にお尋ねください。

import numpy as np 
import multiprocessing 

def processParallel(): 

    def own_process(i, other_args, out_queue): 
     C_sub = get_sub_matrix_C(i, other_args) 
     out_queue.put(C_sub)    

    sub_matrices_list = [] 
    out_queue = multiprocessing.Queue() 
    other_args = 0 
    for i in range(10): 
     p = multiprocessing.Process(
          target=own_process, 
          args=(i, other_args, out_queue)) 
     procs.append(p) 
     p.start() 

    for i in range(10): 
     sub_matrices_list.extend(out_queue.get()) 

    for p in procs: 
     p.join() 

    return sub_matrices_list  

C = np.zeros((100,10)) 

result = processParallel() 

for i in range(10): 
    C[i*10:(i+1)*10,:10] = result[i] 
+0

あなたの答えをありがとう。私はそれを試みましたが、私は混乱した結果を得ました。同じ項目が何度も何度も繰り返された。 – RoSt

+1

バグを修正しました。申し訳ありません。とにかく、もう一つの答えは、より簡潔で実用的なようです。私も自分自身でそれを試してみましょう! :) – eguaio