2017-02-03 5 views
3

最大値を持っている:私は含まと列を持つ新しいデータフレームをしたいスカラ - DATAFRAMEでスパークは、行のために、とカラム名を取得し、私はDATAFRAME持っ

name  column1 column2 column3 column4 
first 2  1  2.1  5.4 
test  1.5  0.5  0.9  3.7 
choose 7  2.9  9.1  2.5 

を持つ列名がの最大値を持っています行:

| name | max_column | 
|--------|------------| 
| first | column4 | 
| test | column4 | 
| choose | column3 | 

ありがとうございます。

+1

参照http://stackoverflow.com/a/42487191/3297229 – Wilmerton

答えて

1

私は私の最終的な解決策ポストたい:

val finalDf = originalDf.withColumn("name", maxValAsMap(keys, values)).select("cookie_id", "max_column") 

val maxValAsMap = udf((keys: Seq[String], values: Seq[Any]) => { 

    val valueMap:Map[String,Double] = (keys zip values).filter(_._2.isInstanceOf[Double]).map{ 
     case (x,y) => (x, y.asInstanceOf[Double]) 
    }.toMap 

    if (valueMap.isEmpty) "not computed" else valueMap.maxBy(_._2)._1 
    }) 

非常に高速です。

3

UDFを書くにはより良い方法があります。しかし、これはあなたがRDDへの迂回を行うと「getValuesMap」を使用して仕事を得る作業溶液

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate 

//implicits for magic functions like .toDf 
import spark.implicits._ 

import org.apache.spark.sql.functions.udf 

//We have hard code number of params as UDF don't support variable number of args 
val maxval = udf((c1: Double, c2: Double, c3: Double, c4: Double) => 
    if(c1 >= c2 && c1 >= c3 && c1 >= c4) 
    "column1" 
    else if(c2 >= c1 && c2 >= c3 && c2 >= c4) 
    "column2" 
    else if(c3 >= c1 && c3 >= c2 && c3 >= c4) 
    "column3" 
    else 
    "column4" 
) 

//create schema class 
case class Record(name: String, 
        column1: Double, 
        column2: Double, 
        column3: Double, 
        column4: Double) 

val df = Seq(
    Record("first", 2.0, 1, 2.1, 5.4), 
    Record("test", 1.5, 0.5, 0.9, 3.7), 
    Record("choose", 7, 2.9, 9.1, 2.5) 
).toDF(); 

df.withColumn("max_column", maxval($"column1", $"column2", $"column3", $"column4")) 
    .select("name", "max_column").show 

出力

+------+----------+ 
| name|max_column| 
+------+----------+ 
| first| column4| 
| test| column4| 
|choose| column3| 
+------+----------+ 
+0

ありがとう!列が少ない場合、このソリューションは非常にうまく機能します。 –

+0

列数が約30であれば提案はありますか? –

3

にある可能性があります。

val dfIn = Seq(
    ("first", 2.0, 1., 2.1, 5.4), 
    ("test", 1.5, 0.5, 0.9, 3.7), 
    ("choose", 7., 2.9, 9.1, 2.5) 
).toDF("name","column1","column2","column3","column4") 

簡単な解決策は

val dfOut = dfIn.rdd 
    .map(r => (
     r.getString(0), 
     r.getValuesMap[Double](r.schema.fieldNames.filter(_!="name")) 
    )) 
    .map{case (n,m) => (n,m.maxBy(_._2)._1)} 
    .toDF("name","max_column") 

あるしかし、あなたは、元のデータフレームから(Scala/Spark dataframes: find the column name corresponding to the maxのように)すべての列を取りたい場合は、行をマージして、スキーマを拡張して少しプレイしています

import org.apache.spark.sql.types.{StructType,StructField,StringType} 
import org.apache.spark.sql.Row 
val dfOut = sqlContext.createDataFrame(
    dfIn.rdd 
    .map(r => (r, r.getValuesMap[Double](r.schema.fieldNames.drop(1)))) 
    .map{case (r,m) => Row.merge(r,(Row(m.maxBy(_._2)._1)))}, 
    dfIn.schema.add(StructField("max_column",StringType)) 
) 
関連する問題