2017-06-19 1 views
1

学習目的のために、アキュムレータのグローバル変数として辞書を設定しようとしましたが、add関数はうまく動作しますが、コードを実行して辞書をマップ関数に置きました。空の。グローバル変数としてdictを持つpyspark内のアキュムレータ

しかし、アクション内で行わアキュムレータの更新については、グローバル変数

class DictParam(AccumulatorParam): 
    def zero(self, value = ""): 
     return dict() 

    def addInPlace(self, acc1, acc2): 
     acc1.update(acc2) 


if __name__== "__main__": 
    sc, sqlContext = init_spark("generate_score_summary", 40) 
    rdd = sc.textFile('input') 
    #print(rdd.take(5)) 



    dict1 = sc.accumulator({}, DictParam()) 


    def file_read(line): 
     global dict1 
     ls = re.split(',', line) 
     dict1+={ls[0]:ls[1]} 
     return line 


    rdd = rdd.map(lambda x: file_read(x)).cache() 
    print(dict1) 
+0

私の問題は、マップが常に空であることです。 – user3341953

答えて

1

print(dict1())は、rdd.map()よりも前に実行されると思います。

  • 変換が将来計算
  • とアクション、アクションのためにそのコールを説明、および実際の実行
をトリガすることは、スパークで

operationsの2種類があります

アキュムレータは、some action is executed

アキュムレータは、Sparkの遅延評価モデルを変更しません。それらが がRDD上の操作内で更新されている場合、それらの値は、RDDがアクションの一部として計算されると、 のみが更新されます。

あなたはドキュメントのこのセクションの最後をチェックアウトする場合は、例は、まさにあなたのようにあります:

accum = sc.accumulator(0) 
def g(x): 
    accum.add(x) 
    return f(x) 
data.map(g) 
# Here, accum is still 0 because no actions have caused the `map` to be computed. 

ですから、例えば、いくつかのアクションを追加する必要があります。

rdd = rdd.map(lambda x: file_read(x)).cache() # transformation 
foo = rdd.count() # action 
print(dict1) 

さまざまなRDD機能とアキュムレータの特性の詳細を確認してください。これが結果の正確さに影響する可能性があります。 (例えば、rdd.take(n)はデフォルトではonly scan one partitionで、データセット全体ではありません)

+0

ありがとう、私は今それを試してみます。 – user3341953

1

としてリストを設定するための類似したコードは、その値は、RDDは、アクションの一部として計算されていることを一度だけ更新 ある

+0

ありがとうございます。なぜ私のコードでは、グローバル変数としての辞書は更新されず、空のままであるのですか?私はリストケースを疲れ、それはうまく動作します。もっと説明できますか?事前に感謝 – user3341953

関連する問題