分析のデータ削減段階では、列合計がすべての列合計の中央値よりも小さいすべての列を削除します。データセットを持つので、 :中央値は7であるscala:列の値がすべての列の中央値未満の列を削除する
v1,v2,v3
1 3 5
3 4 3
I和列
v1,v2,v3
4 7 8
はので、私は私が行上のストリーミング機能でこれを行うことができると思った
v2,v3
3 5
4 3
V1にドロップします。しかし、これは可能ではないようです。
コード私は作品を思いついたが、非常に冗長で、Javaコード(私が間違っているという兆候として取る)とよく似ているようだ。
この操作を効率的に行う方法はありますか?
val val dfv2=DataFrameUtils.openFile(spark,"C:\\Users\\jake\\__workspace\\R\\datafiles\\ikodaDataManipulation\\VERB2.csv")
//return a single row dataframe with sum of each column
val dfv2summed:DataFrame=dfv2.groupBy().sum()
logger.info(s"dfv2summed col count is ${dfv2summed.schema.fieldNames.length}")
//get the rowValues
val rowValues:Array[Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames).values.toArray
//sort the rows
scala.util.Sorting.quickSort(rowValues)
//calculate medians (simplistically)
val median:Long = rowValues(rowValues.length/2)
//ArrayBuffer to hold column needs that need removing
var columnArray: ArrayBuffer[String] = ArrayBuffer[String]()
//get tuple key value pairs of columnName/value
val entries: Map[String, Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames)
entries.foreach
{
//find all columns where total value below median value
kv =>
if(kv._2.<(median))
{
columnArray+=kv._1
}
}
//drop columns
val dropColumns:Seq[String]=columnArray.map(s => s.substring(s.indexOf("sum(")+4,s.length-1)).toSeq
logger.info(s"todrop ${dropColumns.size} : ${dropColumns}")
val reducedDf=dfv2.drop(dropColumns: _*)
logger.info(s"reducedDf col count is ${reducedDf.schema.fieldNames.length}")
例のデータと予想される出力 – mtoto
がなさを共有してください。上記の挿入を参照してください – Jake