5

アップデート1.0開始

機能ALS_Y/ALS_Xに渡されたコールPythonでfor-loopを並列化する方法は?

for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 

引数は参照されない場合にはそうXまたはYは非常にlarge matrixesあるとき、それはのような、arguments..Soをコピーし、私の場合、それは6000*40程度です(そして、それはfor-loopです。繰り返し数を50 000とすると、それはメモリの限界を超えています)。
そして、私はそれが出力

import multiprocessing 
import time 
import numpy as np 

def func(idx): 
    global a 
    a[idx] += 1 



if __name__ == "__main__": 
    a=range(10) 
    for j in xrange(2): 
     pool = multiprocessing.Pool(processes=8) 
     result = [] 
     for i in xrange(10): 
      result.append(pool.apply_async(func, (i,))) 
     pool.close() 
     pool.join() 
     print a 
     print "Sub-process(es) done." 

、ちょうど関数にパラメータとしてインデックスを渡して、グローバルな引数を使用してみました: `

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
Sub-process(es) done. 

So, this means it still copied A`! 今、この問題を処理する方法はありますか?感謝する!

アップデート1.0エンド


以下は行列因数分解の問題を解決するためのpythonでの私のコードです。 W = XY。しかし、以下のコードは効率的ではなく、私はGPUを使用して並列バージョンに変換することができることを願って、最高ですが、CPUも良いです。私は並列プログラミングに関する経験がないので、誰かが私に助言を与えることができますか?以下は

今ALSを使用して行列を因数分解するためのコード(最小二乗を交互に、hereの詳細を)使用済みのマルチプロセッシングのlib後

for ii in range(n_iterations): 
    for u, Wu in enumerate(W): 
     X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
     Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 

、私のコードです:

def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

for ii in range(n_iterations): 
pool = multiprocessing.Pool(processes=12)#create pool 
result = []#store each row for X 
idx = []#store the row number 
for u, Wu in enumerate(W): 
    idx.append(u) 
    result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
pool.close() 
pool.join() 
for u, vector in zip(idx, result): 
    X[u] = vector.get()#assign the result to X 
###################################### 
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X 
result = [] 
idx = [] 
for i, Wi in enumerate(W.T): 
    idx.append(i) 
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
pool.close() 
pool.join() 
for i, vector in zip(idx, result): 
    Y[:,i] = vector.get() 
error = get_error(Q, X, Y, W) 
weighted_errors.append(error) 
print '{}th iteration is completed'.format(ii), 'error: ',error 

しかし、多少悲惨さ、プログラムは常に黙って墜落した...

以下は私のコードの束です。それはすべて乱雑です。ここで私は、ランダム行列を生成するので、ちょうど私がPythonで「並列ループ」のためのあなたの欲求に遭遇し、そして物理学のための私の仕事の一部として、最大1をハッキング昨年..

import pandas as pd 
import numpy as np 
import multiprocessing 

def vec2str(vec): 
    res = '' 
    for dim in len(vec): 
     res += str(vec[dim]) + ',' 
    return res 

def load_data(heads, filename, sep,header=None): 
    data = pd.read_table(filename, sep=sep, header=header, names=heads) 
    rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally... 
    Q = rp.fillna(0) 
    Q = Q.values 
    W = Q >0.5 
    W[W == True] = 1 
    W[W == False] = 0 
    W = W.astype(np.float64, copy=False) 
    return Q, W, rp 

def get_error(Q, X, Y, W): 
    return np.sum((W * (Q - np.dot(X, Y)))**2) 

''' 
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(), 
           np.dot(, np.dot(np.diag(), Q[u].T))).T 
''' 
def ALS_X(Y, Wu, Q, lambda_, n_factors, u): 
    return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T 

''' 
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 
''' 

def ALS_Y(X, Wi, Q, lambda_, n_factors, i): 
    return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), 
           np.dot(X.T, np.dot(np.diag(Wi), Q[:, i]))) 



if __name__ == "__main__": 

    lambda_ = 0.1 
    n_factors = 40 
    filename = 'data_songID' 
    n_iterations = 20 
    #Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',') 
    Q = np.random.rand(1000,1000) 
    m, n = Q.shape 
    W = np.eye(1000) 
    print 'Loading data finished, ', 'size: ', Q.shape 
    print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors) 
    X = 5 * np.random.rand(m, n_factors) 
    Y = 5 * np.random.rand(n_factors, n) 
    errors = [] 
    for ii in range(n_iterations): 
     X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors), 
         np.dot(Y, Q.T)).T 
     Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors), 
         np.dot(X.T, Q)) 
     if ii % 100 == 0: 
      print('{}th iteration is completed'.format(ii)) 
     errors.append(get_error(Q, X, Y, W)) 
     Q_hat = np.dot(X, Y) 
     print('Error of rated movies: {}'.format(get_error(Q, X, Y, W))) 
    print errors 
    #####ALS start....##### 
    print '*'*100 
    weighted_errors = [] 
    for ii in range(n_iterations): 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for u, Wu in enumerate(W): 
      idx.append(u) 
      result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) 
     pool.close() 
     pool.join() 
     for u, vector in zip(idx, result): 
      X[u] = vector.get() 
     ###################################### 
     pool = multiprocessing.Pool(processes=12) 
     result = [] 
     idx = [] 
     for i, Wi in enumerate(W.T): 
      idx.append(i) 
      result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 
     pool.close() 
     pool.join() 
     for i, vector in zip(idx, result): 
      Y[:,i] = vector.get() 
     error = get_error(Q, X, Y, W) 
     weighted_errors.append(error) 
     print '{}th iteration is completed'.format(ii), 'error: ',error 

    weighted_Q_hat = np.dot(X,Y) 
    print weighted_errors 
    X.tofile('X.bin') 
    Y.tofile('Y.bin') 
    latent_user_file = open('user_latent','w') 
    for idx in len(rp.axes[0]): 
     latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n') 

    latent_mid_file = open('mid_latent', 'w') 
    for idx in len(rp.axes[1]): 
     latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n') 
+0

マルチスレッドモジュールを使用して2つのスレッドを作成してから結合してください。 –

+1

@binayr:Pythonのスレッドは、グローバルインタープリタロックの対象ですが、NumPyのようなCの拡張機能は、どのように多くの助けをするのかを言うことは難しいです。 – Kevin

+0

Y_innerループはX_innerループの後に実行する必要があるためです。しかし、X_innerループの各反復は同時に実行できます。そして、私はX_innerループを並列に実行できることを願っています...私のマシンは12-CPUと3-GPUです...これらを十分に活用できることを願っていますが、その方法についてはあまり明確ではありません..悲しい.. –

答えて

1

load_dataget_errorvec2strを無視紙。あなたが望むことをする多くのモジュールがありますが、私は実際にはppの任意の機能を使いたいと思っています。

あなたはこのようなものしたい場合:

ResultList = Library_ParallelLoop.Main(
    Function = ExampleFunction, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

をそれから私が代わりに実装が実際にそれが痛いほど多くの行だった作業を取得するように、この記事では私の全体のソースを提供するという私のGitのハブにあなたを指して、 Pythonであらかじめ構築されていなかったことが明らかになっているPython関数を深くコピーすることに関係していました。

検索素数例:

https://github.com/douglasquincyadams/Main/blob/master/Test_ParallelLoop.py

レポ:

https://github.com/douglasquincyadams/Main

お使いのコンピュータのいくつかの暗い隅に私のレポをダウンロードした場合 - その後、 y非常に上記のような並列ループがすでに便利素晴らしく、言語に組み込まれているものになるはずですが、常に何らかの形でひそかウィザードによってexampledているようです - あなたは私に尋ねた場合

import Library_ParallelLoop 

def do_the_thing_function(ii): 
    for u, Wu in enumerate(W): 
    X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors), 
          np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop 

    for i, Wi in enumerate(W.T): 
    Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop 
          np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop 
    error = get_error(Q, X, Y, W) 
    weighted_errors.append(error) 
    print '{}th iteration is completed'.format(ii) 
    return #whatever your result is supposed to be... your code doesn't work on its own 

ListOfArgSets = [] 
for ii in range(n_iterations): 
    ListOfArgSets.append( { "ii" : ii , } ) 

ResultList = Library_ParallelLoop.Main(
    Function = do_the_thing_function, 
    ListOfArgSets = ListOfArgSets, 
    Algorithm = 'pp', 
    PrintExtra = True 
    ) 

:私たちの作業スニペットはする必要がありますタワーの中であなたがあなたの虚偽のラップトップでそれを試してみるとかなりうまくいきません。とにかく - これが助けて欲しい。

追加のノート - 私はあなたがMPIを使用することをARBITRARY大規模な並列化問題(単純なループ以上のもの)を、解決したい場合は、それを許可することができ添えもののすべての種類を持っているので、ということも示唆しています途中でお互いに話をするプロセス。 MPIは、科学者が最大のシミュレーションに使用するのと同じように、非常に大きなジョブ(〜10k +コア)を処理するために設計されたより大きなサイズのクラスタはすべてMPIをサポートしており、ppやマルチプロセッシングモジュールをサポートすることはできません。 PCのすべてのコア(またはネットワーク上のいくつかのPC)を使用したい場合は、最も簡単なものを選択して作業してください。

関連する問題