2017-12-12 14 views
0

私は整数のデータセットを持っていますが、そのうちのいくつかは実際のデータであり、そのうちのいくつかは特定のしきい値を上回るものの一部はエラーコードです。私はまた、エラーコード範囲の始めに列名のマップを持っています。このマップを使用して条件付きで値を置き換えたいとします。たとえば、各列の行の値がエラー範囲の開始点を超えている場合は、Noneを指定します。 UDFは、特定の列(または複数)を期待し、単一の列を返し、ここでは、様々な異なる列を処理したい -マップに基づくSparkデータフレームの値の置き換え

val errors = Map("Col_1" -> 100, "Col_2" -> 10) 

val df = Seq(("john", 1, 100), ("jacob", 10, 100), ("heimer", 1000, 
1)).toDF("name", "Col_1", "Col_2") 

df.take(3) 
// name | Col_1 | Col_2 
// john | 1  | 1 
// jacob | 10 | 10 
// heimer | 1000 | 1 

//create some function like this 
def fixer = udf((column_value, column_name) => { 
    val crit_val = errors(column_name) 
    if(column_value >= crit_val) { 
     None 
    } else { 
     column_value 
    } 
} 

//apply it in some way 
val fixed_df = df.columns.map(_ -> fixer(_)) 

//to get output like this: 
fixed_df.take(3) 
// name | Col_1 | Col_2 
// john | 1  | 1 
// jacob | 10 | None 
// heimer | None | 1 

答えて

3

これは、UDFを使用してこれを行うにはあまりにも便利ではありません。さらに、しきい値をチェックして値をある定数で置き換える行為は、Sparkの組み込みメソッドwhenを使用して実行することができ、UDFは必要ありません。

だから、ここので、反復的に(私たちはnullと「悪い」の値を置き換えます)関連する列を通過し、所望のデータフレームを生成する、いくつかのしきい値を持って列ごとにwhenを使用する方法です:

import org.apache.spark.sql.functions._ 
import spark.implicits._ 

// fold the list of errors, replacing the original column 
// with a "corrected" column with same name in each iteration 
val newDf = errors.foldLeft(df) { case (tmpDF, (colName, threshold)) => 
    tmpDF.withColumn(colName, when($"$colName" > threshold, null).otherwise($"$colName")) 
} 

newDf.show() 
// +------+-----+-----+ 
// | name|Col_1|Col_2| 
// +------+-----+-----+ 
// | john| 1| 1| 
// | jacob| 10| null| 
// |heimer| null| 1| 
// +------+-----+-----+ 
+1

ありがとう@ tzach-zohar!これは完璧でした。 –

関連する問題