2016-07-05 4 views
0

アキュムレータをタイプListのpysparkに定義し、ワーカーノードの文字列値を蓄積したいとします。ここで私が持っているコードは:スパークのカスタムアキュムレータクラス

class ListParam(AccumulatorParam): 
def zero(self, v): 
    return [] 
def addInPlace(self, acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

I次いで

accu = sc.accumulator([], ListParam()) 

以下のように、このタイプのアキュムレータを定義し

accu.add("abc") 

を次のようにエグゼキュータでそれに異なる値を追加します値abcをアキュムレータ内の1つの値として表示したいのですが、アキュムレータは3つの異なる値(1つのpr文字)を追加し、accuドライバの値は['a','b','c']のように見えます。アキュムレータの別のエントリとして各文字を追加しないように、どのように変更できますか?

--------------編集----------------

次のように私は私のアキュムレータのために別のカスタムクラスを定義し

class VectorAccumulatorParam(AccumulatorParam): 
def zero(self, value): 
    return [0.0] * len(value) 
def addInPlace(self, val1, val2): 
    for i in range(len(val1)): 
     val1[i] += val2[i] 
    return val1  

と労働者の中に、私は次のコード

global accu 
accu += [accuracy] 

を持っているが、私はドライバーにaccuを印刷するとき、それが空です。何か間違っている?

+0

どのように使用されますか? –

答えて

0

アキュムレータを処理する操作を実行するようにsparkに明示的に指示しましたか?あなたが知っておかなければならないので、spark's operations are lazyと何度も実際にあなたのマッピングを実行するためにrdd.collect()を呼び出す必要があります