2016-09-21 2 views
8

MxN配列をレンダリングする関数が定義されています。 配列は非常に巨大なので、私はマルチ処理/スレッドと同時に最終的に小さな配列(M1xN、M2xN、M3xN --- MixN。M1 + M2 + M3 + --- Mi = M)を生成する関数を使いたいこれらの配列を結合してmxn配列を形成します。氏Boardriderは当然実行可能な例を提供するために示唆したように、次の例は広く、私はより多くの時間がかかりますxy増加システムの長さとしてマルチ処理/スレッドを使用してnumpyの配列演算をチャンクに分解する

import numpy as n 
def mult(y,x): 
    r = n.empty([len(y),len(x)]) 
    for i in range(len(r)): 
     r[i] = y[i]*x 
    return r 
x = n.random.rand(10000) 
y = n.arange(0,100000,1) 
test = mult(y=y,x=x) 

をするつもり何を伝えるでしょう。この例では、私は4つのコアを持っている場合、私は仕事の四半期を与えることができるように、このコードを実行したいですr[0]r[24999]最初のコアにr[25000]r[49999]第3コアにはr[50000]~r[74999]が、第4コアには~がそれぞれ割り当てられている。最終的に結果をまとめ、1つの配列を得るためにそれらを追加するr[0]からr[99999]

この例ではわかりやすさがあることを望みます。私の問題がまだ明らかでない場合は、教えてください。こので、それは同じプロセッサ上で複数のコアについてです場合numpyはすでに私たちが今まで手作業で行う可能性がより良い操作を並列化が可能である(multiplication of large arrays in pythonでの議論を参照)

+1

[mcve]についてはどうですか? – boardrider

+0

マルチスレッド/プロセスであっても、numpyの内部ブロードキャストメカニズムよりもPythonで何か速くプログラムすることは決してありません... numpyを内部的にしましょう – Aaron

+0

複数スレッド/プロセスを使用しないように注意してください。膨大な量のデータに対して少量の作業を行うだけでは、CPUがメモリバス速度(CPUのキャッシュなどと比較して遅い)に抑えられます。したがって、アルゴリズムがI/Oバウンドの場合、スレッドを追加してもスピードは向上しません。 – bazza

答えて

6

言うべき最初の事はありますケースの鍵は、乗算は、すべての卸売アレイ動作ではなく、Pythonのfor -loopで行われることを確保するために、単純に次のようになります。

test2 = x[n.newaxis, :] * y[:, n.newaxis] 

n.abs(test - test2).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations 

[あなたが実際には複数の独立したCPU間、これを広めたいと思った場合、それは違います問題ですが、その質問は。


]シングル(マルチコア)CPUを示唆してOK、心の中で上記ベアリング:あなたはちょうどmult()よりも複雑な操作を並列化したいとしましょう。 numpyが並列化できる大規模な配列操作に操作を最適化しようとしましたが、あなたの操作はこれに影響されないとしましょう。その場合、lock=Falsemultiprocessing.Poolで作成された共有メモリmultiprocessing.Arrayを使用して、重複しないチャンクを処理し、yディメンションで分割します(必要に応じて同時にxを超える場合もあります)。リストの例を以下に示します。この方法は、明示的に指定したものを明示的に実行するものではなく(結果をまとめて単一の配列に追加する)ことに注意してください。むしろ、より効率的な処理を行います。複数のプロセスが、共有メモリの重複しない部分で同時に複数のプロセスを組み込んで応答します。完了したら、照合/追加は必要ありません。ただ結果を読み出すだけです。

import os, numpy, multiprocessing, itertools 

SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/ 

def operate(slices): 
    # grok the inputs 
    yslice, xslice = slices 
    y, x, r = get_shared_arrays('y', 'x', 'r') 
    # create views of the appropriate chunks/slices of the arrays: 
    y = y[yslice] 
    x = x[xslice] 
    r = r[yslice, xslice] 
    # do the actual business 
    for i in range(len(r)): 
     r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself. 
         # But let's assume this is a placeholder for something more complicated. 

    return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size) 

def check(y, x, r): 
    r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary) 
    print('max. abs. diff. = %g' % numpy.abs(r - r2).max()) 
    return y, x, r 

def slicestr(s): 
    return ':'.join('' if x is None else str(x) for x in [s.start, s.stop, s.step]) 

def m2n(buf, shape, typecode, ismatrix=False): 
    """ 
    Return a numpy.array VIEW of a multiprocessing.Array given a 
    handle to the array, the shape, the data typecode, and a boolean 
    flag indicating whether the result should be cast as a matrix. 
    """ 
    a = numpy.frombuffer(buf, dtype=typecode).reshape(shape) 
    if ismatrix: a = numpy.asmatrix(a) 
    return a 

def n2m(a): 
    """ 
    Return a multiprocessing.Array COPY of a numpy.array, together 
    with shape, typecode and matrix flag. 
    """ 
    if not isinstance(a, numpy.ndarray): a = numpy.array(a) 
    return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix) 

def new_shared_array(shape, typecode='d', ismatrix=False): 
    """ 
    Allocate a new shared array and return all the details required 
    to reinterpret it as a numpy array or matrix (same order of 
    output arguments as n2m) 
    """ 
    typecode = numpy.dtype(typecode).char 
    return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix 

def get_shared_arrays(*names): 
    return [m2n(*SHARED_VARS[name]) for name in names] 

def init(*pargs, **kwargs): 
    SHARED_VARS.update(pargs, **kwargs) 

if __name__ == '__main__': 

    ylen = 1000 
    xlen = 2000 

    init(y=n2m(range(ylen))) 
    init(x=n2m(numpy.random.rand(xlen))) 
    init(r=new_shared_array([ylen, xlen], float)) 

    print('Master process ID is %s' % os.getpid()) 

    #print(operate([slice(None), slice(None)])); check(*get_shared_arrays('y', 'x', 'r')) # local test 

    pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items()) 
    yslices = [slice(0,333), slice(333,666), slice(666,None)] 
    xslices = [slice(0,1000), slice(1000,None)] 
    #xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension 
    reports = pool.map(operate, itertools.product(yslices, xslices)) 
    print('\n'.join(reports)) 
    y, x, r = check(*get_shared_arrays('y', 'x', 'r')) 
関連する問題