2012-01-30 38 views
3

私は計算しようとしている金額があり、コードを並列化するのが難しいです。私が並列化しようとしている計算は複雑です(numpy配列とscipy sparse行列の両方を使います)。それは数が少ない配列を吐き出し、私は約1000の計算から出力配列を合計したいと思います。理想的には、私はすべての反復にわたって実行中の合計を保持します。しかし、私はこれを行う方法を理解することができませんでした。python numpyで合計計算を並列化する方法は?

これまで、私はjoblibのParallel関数とpythonのマルチプロセッシングパッケージでpool.map関数を使ってみました。これらの両方のために、私はnumpy配列を返す内部関数を使用します。これらの関数はリストを返します。リストはnumpy配列に変換され、さらに合計されます。

しかし、joblib Parallel関数がすべての反復を完了すると、メインプログラムは実行されません(元のジョブは0%CPUを使用して中断状態にあるように見えます)。 pool.mapを使用すると、すべての反復が完了した後にメモリエラーが発生します。

単純に実行中の配列の合計を並列化する方法はありますか?

を編集します。目標は、並行して以外は次のようにすることです。

def summers(num_iters): 

    sumArr = np.zeros((1,512*512)) #initialize sum 
    for index in range(num_iters): 
     sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array 

    return sumArr 
+0

となります。最小限のコード例を投稿してください。畳み込みをしようとしていますか? – Simon

+0

いいえ、私は畳み込みをしていません。私は約1000回画像を回転しています。回転ごとに結果を合計する必要があります。 pool.mapについては、私はちょうど を使用しています。 'outputArr = np.array(pool.map(parloop、range(num_views))' '' parloop'はnumpy配列を返します。 – Kevin

+0

これはすでに並行しているのでしょうか? "numpyは、あなたが行列のドットプロダクトをしたいことを知っているので、" BLAS "(基本線形代数サブルーチン)の一部として得られた最適化された実装を使用することができます。...多くのアーキテクチャは、マルチコアマシンあなたのnumpy/scipyがこれらのいずれかを使ってコンパイルされている場合、dot()は何もしなくても**(これが速ければ)並列に計算されます** " www.scipy.org/ParallelProgramming – endolith

答えて

5

私はマルチプロセッシング、apply_async、およびコールバックでの配列の合計を並列実行する方法を考え出したので、私は他の人のためにここにこれを掲示しています。 Sumコールバッククラスにはthe example page for Parallel Pythonを使用しましたが、実装には実際にそのパッケージを使用しませんでした。それは私にコールバックを使用する考えを与えました。ここでは、私が使い終わったものの簡略化したコードを示します。

import multiprocessing 
import numpy as np 
import thread 

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments) 
    def __init__(self): 
     self.value = np.zeros((1,512*512)) #this is the initialization of the sum 
     self.lock = thread.allocate_lock() 
     self.count = 0 

    def add(self,value): 
     self.count += 1 
     self.lock.acquire() #lock so sum is correct if two processes return at same time 
     self.value += value #the actual summation 
     self.lock.release() 

def computation(index): 
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes 
    return array1 

def summers(num_iters): 
    pool = multiprocessing.Pool(processes=8) 

    sumArr = Sum() #create an instance of callback class and zero the sum 
    for index in range(num_iters): 
     singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add) 

    pool.close() 
    pool.join() #waits for all the processes to finish 

    return sumArr.value 

私はまた、別の答えで示唆された並列化されたマップを使用してこれを行うことができました。私は以前これを試していましたが、正しく実装していませんでした。両方の方法で動作し、私はthis answerがどのメソッド(mapまたはapply.async)を使用するかの問題をかなりうまく説明していると思います。地図バージョンでは、クラスSumを定義する必要はなく、加算関数は

def summers(num_iters): 
    pool = multiprocessing.Pool(processes=8) 

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these 
    sumArr = np.zeros((1,512*512))    #but I do to make sure I have the memory 

    outputArr = np.array(pool.map(computation, range(num_iters))) 
    sumArr = outputArr.sum(0) 

    pool.close() #not sure if this is still needed since map waits for all iterations 

    return sumArr 
1

私はこの問題を理解していません。リストをワーカーのプールに分割し、それらの計算の実行中の合計を保持して、結果を合計するだけですか?

#!/bin/env python 
import sys 
import random 
import time 
import multiprocessing 
import numpy as np 

numpows = 5 
numitems = 25 
nprocs = 4 

def expensiveComputation(i): 
    time.sleep(random.random() * 10) 
    return np.array([i**j for j in range(numpows)]) 

def listsum(l): 
    sum = np.zeros_like(l[0]) 
    for item in l: 
    sum = sum + item 
    return sum 

def partition(lst, n): 
    division = len(lst)/float(n) 
    return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ] 

def myRunningSum(l): 
    sum = np.zeros(numpows) 
    for item in l: 
    sum = sum + expensiveComputation(item) 
    return sum 

if __name__ == '__main__': 

    random.seed(1) 
    data = range(numitems) 

    pool = multiprocessing.Pool(processes=4,) 
    calculations = pool.map(myRunningSum, partition(data,nprocs)) 

    print 'Answer is:', listsum(calculations) 
    print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.]) 

(パーティション関数はPython: Slicing a list into n nearly-equal-length partitionsから来る)

+0

ありがとうございます。最後のリストを合計することなく、複数の反復にわたって実行合計を保持したいと考えました。私は約1000回画像を回転しています。回転ごとに計算結果を合計する必要があります。私はそれを行う方法を考え出し、それを答えとして掲示したと思う。 – Kevin

+0

プロセッサごとに1つの配列が返されるのは本当に大きな問題ですか?この方法ではオーバーヘッドが少なくなります。 –

関連する問題