学習目的のために、アキュムレータのグローバル変数として辞書を設定しようとしましたが、add関数はうまく動作しますが、コードを実行して辞書をマップ関数に置きました。空の。グローバル変数としてdictを持つpyspark内のアキュムレータ
しかし、アクション内で行わアキュムレータの更新については、グローバル変数
class DictParam(AccumulatorParam):
def zero(self, value = ""):
return dict()
def addInPlace(self, acc1, acc2):
acc1.update(acc2)
if __name__== "__main__":
sc, sqlContext = init_spark("generate_score_summary", 40)
rdd = sc.textFile('input')
#print(rdd.take(5))
dict1 = sc.accumulator({}, DictParam())
def file_read(line):
global dict1
ls = re.split(',', line)
dict1+={ls[0]:ls[1]}
return line
rdd = rdd.map(lambda x: file_read(x)).cache()
print(dict1)
私の問題は、マップが常に空であることです。 – user3341953