I 4列(c1、c2、c3、c4)のデータをいくつか持っていて、いくつかのスカラコードでRDDに取り込んだ。Spark - 1つの列にグループ化して他の列の平均を見つける
私は/ binをc1でグループ化し、それぞれのc1グループのc2の平均とc3の平均とc4を見つけたいと思います。
私はRDD:reduceByKeyを見ていますが、使用方法を正確に理解できていません。これを行うより良い方法はありますか? Scala APIからこれをどうやって行うことができますか?
I 4列(c1、c2、c3、c4)のデータをいくつか持っていて、いくつかのスカラコードでRDDに取り込んだ。Spark - 1つの列にグループ化して他の列の平均を見つける
私は/ binをc1でグループ化し、それぞれのc1グループのc2の平均とc3の平均とc4を見つけたいと思います。
私はRDD:reduceByKeyを見ていますが、使用方法を正確に理解できていません。これを行うより良い方法はありますか? Scala APIからこれをどうやって行うことができますか?
あなたはおそらく(いずれか、あまり効率的であり、この場合は、おそらくあまり直感的)RDD
APIを使うべきではありませんので、あなたは、DataFrame
を持っていると言う - ここDataFrame
APIを使用したソリューションです:
import org.apache.spark.sql.functions._
val result = df.groupBy("c1").agg(mean("c2"), mean("c3"), mean("c4"))
root
|-- c1: string (nullable = true)
|-- avg(c2): double (nullable = true)
|-- avg(c3): double (nullable = true)
|-- avg(c4): double (nullable = true)
EDIT:
result
は、以下のスキーマ(c1
を想定することで開始する文字列である)を有するデータフレームであろう:
列のリストが動的である場合には、あなたが簡単に対応する「手段」のリストに、このようなリストをマッピングし、そのリストを使用してDFを集約することができます:完全性については
val colsToCompute = List("c2", "c3", "c4") // can be loaded dynamically
val means: Seq[Column] = colsToCompute.map(mean)
val result = df.groupBy("c1").agg(means.head, means.tail: _*)
- ここですRDD APIを使用したソリューションが、:
悪化し実行することがあります:
val rdd: RDD[(String, Int, Int, Int)] = ...
val result: RDD[(String, (Double, Double, Double))] = rdd
.keyBy(_._1)
.mapValues { case (k, v1, v2, v3) => (1, v1, v2, v3) } // add base for counter
.reduceByKey { case ((a1, a2, a3, a4), (b1, b2, b3, b4)) => (a1+b1, a2+b2, a3+b3, a4+b4) } // sum counter and values
.mapValues { case (count, v1, v2, v3) => (v1.toDouble/count, v2.toDouble/count, v3.toDouble/count) } // calculate means
私は今、データフレームを持っていないが、私はので、RDD経由でそれをやって見ています最終的にデータは動的になります。それは4列または3列を持つことができます。プログラムは、設定ファイルを調べることによって、Meanにする列の数を調べます。 –
"最終的にデータは動的になりますが、4列または3列を持つことができます" - これもDataFrameで簡単にできます... –