2017-10-01 6 views
0

を理解しようとしている私は、特定のキーに関連付けられたすべての値の平均値を見つけたいと下記の私のプログラムである:以下reduceByKey()アクションの動作

from pyspark import SparkContext,SparkConf 

conf = SparkConf().setAppName("averages").setMaster("local") 
sc = SparkContext(conf=conf) 

file_rdd = sc.textFile("C:\spark_programs\python programs\input") 

vals_rdd = file_rdd.map(lambda x:(x.split(" ")[0],int(x.split(" ")[2]))) 

print type(vals_rdd) 

pairs_rdd = vals_rdd.reduceByKey(lambda x,y:(x+y)/2) 

for line in pairs_rdd.collect(): 
    print line 

は、入力データである:

私はプログラム私が手に出力を実行

a hyd 2 
b hyd 2 
c blr 3 
d chn 4 
b hyd 5 
は以下の通りです:Bの値aから離れ

(u'a', 2) 
(u'c', 3) 
(u'b', 3) -- I could see only got b's value getting averaged. 
(u'd', 4) 

値は平均化されません。それはなぜ起こるのですか?なぜa、c、dの値は平均化されていないのですか?

答えて

1

reduceByKey is used to

連想と可換を使用して各キーの値は、機能を削減マージ。あなたは合格

機能は、これらの要件を満たしていません。特に、関連性がありません:

f = lambda x,y:(x + y)/2 

f(1, f(2, 3)) 
## 1.75 
f(f(1, 2), 3) 
## 2.25 

したがって、あなたのケースでは適用されず、値を平均化しません。

値は平均化されていません。それはなぜ起こるのですか?

上記で説明した根本的な欠陥とは別に、残りのキーごとに1つの値しかないため、マージ関数を呼び出す理由はまったくありません。

私は特定のキー

に関連した平均値の値を見つけたい

だけDataFrames使用:

vals_rdd.toDF().groupBy("_1").avg() 

もののすることができますuse aggregateByKey with StatCounter(数値的に安定)またはmap ->reduceByKey ->map(数値的に不安定) 。

また、私は強くreduceByKey: How does it work internally?に偉大な答えを読んでお勧めします。

関連する問題