に基づいて平均値計算私は2つのデータフレームを持っている:分割スパークデータフレームおよび1つの列値
Class, Calculation
first, Average
Second, Sum
Third, Average
セカンドデータフレームstudentRecord
は、以下のように周り50Kエントリ有する:
classRecord
は、次のような10個の異なるエントリを有します
Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second
2番目のデータフレームから、クラスの型に基づいて、キャンプに基づいて高さ(最初のクラスは平均、2番目のクラスは合計:等)を計算したいと思います(クラスがfir黄色、白、その他の別々の平均)。ここで
//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = {
val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect
return avg_by_key
}
//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)
// for each loop on each class type
classRecord.rdd.foreach{
//filter students based on camps
var campYellow =studentRecord.filter($"Camp" === "yellow")
var campWhite =studentRecord.filter($"Camp" === "white")
var campRed =studentRecord.filter($"Camp" === "red")
// since I know that calculation for first class is average, so representing calculation only for class first
val avgcampYellow = averageOnName(campYellow)
val avgcampWhite = averageOnName(campWhite)
val avgcampRed = averageOnName(campRed)
// union of all
val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
//union with yellow camp data
val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
var dfYellWhite = dfYellow.union(dfWhite)
//union with yellow,white camp data
val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
//conversion of rdd to frame
var dfRed = sqlContext.createDataFrame(rddRed, schema)
var dfYellWhiteRed = dfYellWhite .union(dfRed)
// other modifications and final result to hive
}
私は苦労しています:
1.hardcoding Yellow, red and white, there may be other camp type also.
2. Filtering same dataframe many times
3. Not able to figure out how to calculate differently according to class calculation type.
ヘルプは高く評価され 私は、次の試してみました。ありがとう。
私が正しく理解していれば、キャンプとクラスの両方に応じて平均値または合計の高さが求められますか?どのようなキャンプ/クラスのすべての組み合わせの両方の計算については、それをデータフレームに入れ、別に 'classRecord'dfの読書をしますか? – Shaido