2017-02-19 11 views
-1

I 4列(c1、c2、c3、c4)のデータをいくつか持っていて、いくつかのスカラコードでRDDに取り込んだ。Spark - 1つの列にグループ化して他の列の平均を見つける

私は/ binをc1でグループ化し、それぞれのc1グループのc2の平均とc3の平均とc4を見つけたいと思います。

私はRDD:reduceByKeyを見ていますが、使用方法を正確に理解できていません。これを行うより良い方法はありますか? Scala APIからこれをどうやって行うことができますか?

答えて

2

あなたはおそらく(いずれか、あまり効率的であり、この場合は、おそらくあまり直感的)RDD APIを使うべきではありませんので、あなたは、DataFrameを持っていると言う - ここDataFrame APIを使用したソリューションです:

import org.apache.spark.sql.functions._ 

val result = df.groupBy("c1").agg(mean("c2"), mean("c3"), mean("c4")) 

root 
|-- c1: string (nullable = true) 
|-- avg(c2): double (nullable = true) 
|-- avg(c3): double (nullable = true) 
|-- avg(c4): double (nullable = true) 

EDIT:

resultは、以下のスキーマ(c1を想定することで開始する文字列である)を有するデータフレームであろう

列のリストが動的である場合には、あなたが簡単に対応する「手段」のリストに、このようなリストをマッピングし、そのリストを使用してDFを集約することができます:完全性については

val colsToCompute = List("c2", "c3", "c4") // can be loaded dynamically 
val means: Seq[Column] = colsToCompute.map(mean) 
val result = df.groupBy("c1").agg(means.head, means.tail: _*) 

- ここですRDD APIを使用したソリューションが、:

  • それは、列の動的な数の「generify」するためにはるかに困難です
  • はるかに少ない簡潔だ
  • ことがあり若干短く実装することはなく、はるかに簡単かもしれない

悪化し実行することがあります:

val rdd: RDD[(String, Int, Int, Int)] = ... 

val result: RDD[(String, (Double, Double, Double))] = rdd 
    .keyBy(_._1) 
    .mapValues { case (k, v1, v2, v3) => (1, v1, v2, v3) } // add base for counter 
    .reduceByKey { case ((a1, a2, a3, a4), (b1, b2, b3, b4)) => (a1+b1, a2+b2, a3+b3, a4+b4) } // sum counter and values 
    .mapValues { case (count, v1, v2, v3) => (v1.toDouble/count, v2.toDouble/count, v3.toDouble/count) } // calculate means 
+0

私は今、データフレームを持っていないが、私はので、RDD経由でそれをやって見ています最終的にデータは動的になります。それは4列または3列を持つことができます。プログラムは、設定ファイルを調べることによって、Meanにする列の数を調べます。 –

+0

"最終的にデータは動的になりますが、4列または3列を持つことができます" - これもDataFrameで簡単にできます... –

関連する問題