pyspark accumulatorを使用してrdd
から推測された値を行列に追加したいです。私は文書が少し不明であることを発見した。バックグラウンドのビットを追加します。
私のrddData
には、1つのカウントをマトリックスに追加しなければならないインデックスのリストが含まれています。たとえば、このリストはインデックスにマップ:pyspark行列アキュムレータ
from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
def zero(self, mInitial):
import numpy as np
aaZeros = np.zeros(mInitial.shape)
return aaZeros
def addInPlace(self, mAdd, lIndex):
mAdd[lIndex[0], lIndex[1]] += 1
return mAdd
だから、これは私のマッパー機能である:
def populate_sparse(lIndices):
for i1 in lIndices:
for i2 in lIndices:
oAccumilatorMatrix.add([i1, i2])
そしてデータを実行します。今すぐ
[1,3,4] -> (11), (13), (14), (33), (34), (44)
、ここで私のアキュムレータである
oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())
rddData.map(populate_sparse).collect()
今、私のdaを見るとta:
sum(sum(oAccumilatorMatrix.value))
#= 0.0
これはありません。私は何が欠けていますか?最初は疎行列でこれを試してみました
EDIT は、スパース行列がサポートされていないと、このトレースバックを得ました。高密度な数値マトリックスの変更された質問:
...
raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.