2016-03-24 13 views
2

pyspark accumulatorを使用してrddから推測された値を行列に追加したいです。私は文書が少し不明であることを発見した。バックグラウンドのビットを追加します。
私のrddDataには、1つのカウントをマトリックスに追加しなければならないインデックスのリストが含まれています。たとえば、このリストはインデックスにマップ:pyspark行列アキュムレータ

from pyspark.accumulators import AccumulatorParam 
class MatrixAccumulatorParam(AccumulatorParam): 
    def zero(self, mInitial): 
     import numpy as np 
     aaZeros = np.zeros(mInitial.shape) 
     return aaZeros 

    def addInPlace(self, mAdd, lIndex): 
     mAdd[lIndex[0], lIndex[1]] += 1 
     return mAdd 

だから、これは私のマッパー機能である:

def populate_sparse(lIndices): 
    for i1 in lIndices: 
     for i2 in lIndices: 
      oAccumilatorMatrix.add([i1, i2]) 

そしてデータを実行します。今すぐ
[1,3,4] -> (11), (13), (14), (33), (34), (44)

、ここで私のアキュムレータである

oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam()) 

rddData.map(populate_sparse).collect() 

今、私のdaを見るとta:

sum(sum(oAccumilatorMatrix.value)) 
#= 0.0 

これはありません。私は何が欠けていますか?最初は疎行列でこれを試してみました

EDIT は、スパース行列がサポートされていないと、このトレースバックを得ました。高密度な数値マトリックスの変更された質問:

... 

    raise IndexError("Indexing with sparse matrices is not supported" 
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes. 

答えて

0

Aha!私はそれを得たと思う。アキュムレータは、一日の終わりに、それ自身の部品をそれ自体に追加する必要があります。だから今、それはそれはリストを与えられたときにインデックスを追加し、私の最後のマトリックスを作成するためにpopulate_sparse機能ループの後に自分自身を追加し

def addInPlace(self, mAdd, lIndex): 
    if type(lIndex) == list: 
     mAdd[lIndex[0], lIndex[1]] += 1 
    else: 
     mAdd += lIndex 
    return mAdd 

:だから、へaddInPlaceを変更します。

関連する問題