2016-10-17 16 views
3

reduceByKeyが呼び出されると、すべての値が同じキーで合計されます。各キーの平均値を計算する方法はありますか?Spark:Scalaを使用したreduceByKeyの値の平均ではなく値の平均

// I calculate the sum like this and don't know how to calculate the avg 
reduceByKey((x,y)=>(x+y)).collect 


Array(((Type1,1),4.0), ((Type1,1),9.2), ((Type1,2),8), ((Type1,2),4.5), ((Type1,3),3.5), 
((Type1,3),5.0), ((Type2,1),4.6), ((Type2,1),4), ((Type2,1),10), ((Type2,1),4.3)) 
+0

私はあなたが(タイトルのように)reduceByKeyを意味していることと思います。試したコードの例を示してください。 ReduceByKeyにはデフォルトの集約がないため、集計した場合はそれを求めます。 – Wilmerton

+2

[スパークRDDの可能な複製:統計を最も効率的に計算する方法](http://stackoverflow.com/questions/39981312/spark-rdd-how-to-calculate-statistics-most-効率的) – mtoto

+2

あなたは両方の合計が必要です分離してカウントするhttp://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-kv-rdd-in-spark-with-pyth –

答えて

0

多くの方法がありますが、単純な方法は、合計とカウントを追跡し、最後に平均を計算するクラスを使用することです。このようなものはうまくいくでしょう。

class AvgCollector(val tot: Double, val cnt: Int = 1) { 
    def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt) 
    def avg = tot/cnt 
} 

val rdd2 = { 
    rdd 
    .map{ case (k,v) => (k, new AvgCollector(v)) } 
    .reduceByKey(_ combine _) 
    .map{ case (k,v) => (k, v.avg) } 
} 

...またはあなたがクラスに微調整してaggregateByKey使用することができます

class AvgCollector(val tot: Double, val cnt: Int = 1) { 
    def ++(v: Double) = new AvgCollector(tot + v, cnt + 1) 
    def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt) 
    def avg = if (cnt > 0) tot/cnt else 0.0 
} 

rdd2 = { 
    rdd 
    .aggregateByKey(new AvgCollector(0.0,0))(_ ++ _, _ combine _) 
    .map{ case (k,v) => (k, v.avg) } 
}