2017-09-12 11 views
0

分析のデータ削減段階では、列合計がすべての列合計の中央値よりも小さいすべての列を削除します。データセットを持つので、 :中央値は7であるscala:列の値がすべての列の中央値未満の列を削除する

v1,v2,v3 
1 3 5 
3 4 3 

I和列

v1,v2,v3 
4 7 8 

はので、私は私が行上のストリーミング機能でこれを行うことができると思った

v2,v3 
3 5 
4 3 

V1にドロップします。しかし、これは可能ではないようです。

コード私は作品を思いついたが、非常に冗長で、Javaコード(私が間違っているという兆候として取る)とよく似ているようだ。

この操作を効率的に行う方法はありますか?

val val dfv2=DataFrameUtils.openFile(spark,"C:\\Users\\jake\\__workspace\\R\\datafiles\\ikodaDataManipulation\\VERB2.csv") 

    //return a single row dataframe with sum of each column 
    val dfv2summed:DataFrame=dfv2.groupBy().sum() 

    logger.info(s"dfv2summed col count is ${dfv2summed.schema.fieldNames.length}") 


    //get the rowValues 
    val rowValues:Array[Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames).values.toArray 

    //sort the rows 
    scala.util.Sorting.quickSort(rowValues) 

    //calculate medians (simplistically) 
    val median:Long = rowValues(rowValues.length/2) 

    //ArrayBuffer to hold column needs that need removing 
    var columnArray: ArrayBuffer[String] = ArrayBuffer[String]() 

    //get tuple key value pairs of columnName/value 
    val entries: Map[String, Long]=dfv2summed.head().getValuesMap(dfv2summed.schema.fieldNames) 
    entries.foreach 
    { 

    //find all columns where total value below median value 
    kv => 

     if(kv._2.<(median)) 
     { 
      columnArray+=kv._1 
     } 
    } 

    //drop columns 
    val dropColumns:Seq[String]=columnArray.map(s => s.substring(s.indexOf("sum(")+4,s.length-1)).toSeq 
    logger.info(s"todrop ${dropColumns.size} : ${dropColumns}") 
    val reducedDf=dfv2.drop(dropColumns: _*) 
    logger.info(s"reducedDf col count is ${reducedDf.schema.fieldNames.length}") 
+0

例のデータと予想される出力 – mtoto

+0

がなさを共有してください。上記の挿入を参照してください – Jake

答えて

1

Sparkの各列の和を計算した後、我々は、プレーンScalaにおける中央値を取得し、以上列インデックスによってこの値に等しい列だけを選択することができます。

のは、中央値を計算する関数を定義から始めましょう、それはthis exampleのわずかな修正である:

def median(seq: Seq[Long]): Long = { 
    //In order if you are not sure that 'seq' is sorted 
    val sortedSeq = seq.sortWith(_ < _) 

    if (seq.size % 2 == 1) sortedSeq(sortedSeq.size/2) 
    else { 
    val (up, down) = sortedSeq.splitAt(seq.size/2) 
    (up.last + down.head)/2 
    } 
} 

私たちは、最初のすべての列の合計を計算し、Seq[Long]に変換します

import org.apache.spark.sql.functions._ 
val sums = df.select(df.columns.map(c => sum(col(c)).alias(c)): _*) 
      .first.toSeq.asInstanceOf[Seq[Long]] 

次に、median,

val med = median(sums) 

そして維持するために列インデックスを生成するための閾値として、それを使用します。

val cols_keep = sums.zipWithIndex.filter(_._1 >= med).map(_._2) 

は最後に、我々はselect()文の中これらの指標をマップ:

df.select(cols_keep map df.columns map col: _*).show() 
+---+---+ 
| v2| v3| 
+---+---+ 
| 3| 5| 
| 4| 3| 
+---+---+ 
+0

非常に役立ちます。ありがとうございました – Jake

+0

注:import org.apache.spark.sql.functions._が必要です – Jake

関連する問題