2017-03-09 14 views
0

私はこのRDDをブロードキャストしました。RDDのブロードキャスト変数を更新

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0])) 
return : 
{0: 1, 1: 2, 2: 3, 3: 4} 

私はタプルのリストである他のRDDあります

tuples=sc.parallelize([(0,1),(1,2),(3,2)]) 

私の目標は、私の放送の変数のためのキーとしてタプルを使用し、そうするための1つの

でその値を更新することですがタプル(0,1)私の新しい放送変数になります。タプル(3,2)

{0: 2, 1: 4, 2: 5, 3: 5} 

可変{0: 2, 1: 4, 2: 5, 3: 5}

I放送最後の更新を返すためのタプル(1,2)

{0: 2, 1: 4, 2: 4, 3: 4} 

ため

{0: 2, 1: 3, 2: 3, 3: 4} 

それをコード化しようとしましたが、私の結果は良くありません。それぞれのタプルは1つ増えていますが、考慮しません最後の結果。

def modify_broadcast(j,test): 
    main=j[0] 
    context=j[1] 
    test.value[main]=test.value[main]+1 
    test.value[context]=test.value[context]+1 
    return test.value 

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0])) 
test = sc.broadcast(test.collectAsMap()) 


print(test.value[0]) 
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test)) 

答えて

0

ブロードキャストするときは、共有変数のようなものです。ルックアップ値のように使用して、読み取り専用として扱うことができます。私の学習から、各ワーカーノードはその変数のローカルコピーを持ち、それ自身のコピーを更新します。それらが各ノードに一度だけ渡されるので、他のワーカーノードには反映されません。学習スパーク帳から

放送変数は単に型Tの値をラップタイプspark.broadcast.Broadcast [T]の目的であり、我々は、値を呼び出してこの値にアクセスすることができ私たちのタスクのBroadcastオブジェクトに追加します。この値は、効率的なBitTorrentのような通信メカニズムを使用して、各ノードに一度だけ送信されます。

ブロードキャスト変数を使用するプロセスは簡単です。1.タイプTのオブジェクトでSparkContext.broadcastを呼び出してブロードキャスト[T]を作成します。いずれのタイプも、シリアライズ可能である限り動作します。 2. valueプロパティ(またはJavaのvalue()メソッド)を使用してその値にアクセスします。 3.変数は各ノードに一度だけ送信され、読み取り専用として扱われます(更新は他のノードに伝播されません)。

+0

Thx suresh、それを行う別の解決策をご存知ですか? –

+0

アキュムレータ(AccumulatorParamクラス)を試すことができます。あなたはそれを完了するために少し微調整することができます。インプレースの追加などが必要です。これをチェックしてください http://www.opensyssoft.com/2015/07/custom-accumulators-in-spark-using.html – Suresh

+0

はいそれはうまくいきました –

関連する問題