私はこのRDDをブロードキャストしました。RDDのブロードキャスト変数を更新
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
return :
{0: 1, 1: 2, 2: 3, 3: 4}
私はタプルのリストである他のRDDあります
tuples=sc.parallelize([(0,1),(1,2),(3,2)])
私の目標は、私の放送の変数のためのキーとしてタプルを使用し、そうするための1つの
でその値を更新することですがタプル(0,1)私の新しい放送変数になります。タプル(3,2)
{0: 2, 1: 4, 2: 5, 3: 5}
可変{0: 2, 1: 4, 2: 5, 3: 5}
I放送最後の更新を返すためのタプル(1,2)
{0: 2, 1: 4, 2: 4, 3: 4}
ため
{0: 2, 1: 3, 2: 3, 3: 4}
それをコード化しようとしましたが、私の結果は良くありません。それぞれのタプルは1つ増えていますが、考慮しません最後の結果。
def modify_broadcast(j,test):
main=j[0]
context=j[1]
test.value[main]=test.value[main]+1
test.value[context]=test.value[context]+1
return test.value
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
test = sc.broadcast(test.collectAsMap())
print(test.value[0])
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test))
Thx suresh、それを行う別の解決策をご存知ですか? –
アキュムレータ(AccumulatorParamクラス)を試すことができます。あなたはそれを完了するために少し微調整することができます。インプレースの追加などが必要です。これをチェックしてください http://www.opensyssoft.com/2015/07/custom-accumulators-in-spark-using.html – Suresh
はいそれはうまくいきました –