2016-06-11 23 views
5

Apache SparkとPythonを使って行列の乗算をしようとしています。Pythonを使ったスパーク行列の乗算

mat = mat1 * mat2 
:ここ

mat1 = RowMatrix(rows_1) 
mat2 = RowMatrix(rows_2) 

が、私はこのような何かをしたいと思い、私のデータ

from pyspark.mllib.linalg.distributed import RowMatrix 

ベクトル

rows_1 = sc.parallelize([[1, 2], [4, 5], [7, 8]]) 
rows_2 = sc.parallelize([[1, 2], [4, 5]]) 

マイmaxtrixのマイRDDです

行列乗算を処理する関数を書きましたが、処理時間が長くなることが恐れられます。ここに私の機能があります:

def matrix_multiply(df1, df2): 
    nb_row = df1.count()  
    mat=[] 
    for i in range(0, nb_row): 
     row=list(df1.filter(df1['index']==i).take(1)[0]) 
     row_out = [] 
     for r in range(0, len(row)): 
      r_value = 0 
      col = df2.select(df2[list_col[r]]).collect() 
      col = [list(c)[0] for c in col] 
      for c in range(0, len(col)): 
       r_value += row[c] * col[c] 
      row_out.append(r_value)    
     mat.append(row_out) 
    return mat 

私の機能は、たくさんのスパークアクション(テイク、コレクトなど)を行います。機能には多くの処理時間がかかりますか? 誰かが別のアイデアを持っていると、それは私の役に立つでしょう。

答えて

8

できません。 RowMatrixは意味のある行インデックスを持たないため、乗算に使用することはできません。分散行列which supports multiplication with another distributed structureのみがBlockMatrixであることを無視しても。

from pyspark.mllib.linalg.distributed import * 

def as_block_matrix(rdd, rowsPerBlock=1024, colsPerBlock=1024): 
    return IndexedRowMatrix(
     rdd.zipWithIndex().map(lambda xi: IndexedRow(xi[1], xi[0])) 
    ).toBlockMatrix(rowsPerBlock, colsPerBlock) 

as_block_matrix(rows_1).multiply(as_block_matrix(rows_2)) 
+1

ありがとうございました。しかし、それは私のためには機能しません。私はSpark 1.5.0を使用しています。 ** AttributeError: 'BlockMatrix'オブジェクトに 'multiply'属性がありません** – Raouf

+1

1.6で導入されました。 – zero323

+1

わかりました。私はそれを処理する関数を作成します(上記のポストを参照してください)。 – Raouf