ネストされたレベルの構造体にフィールドを追加または置換するにはどうすればよいですか?Spark DataFrameにネストされた列を追加する
この入力:
val rdd = sc.parallelize(Seq(
"""{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""",
"""{"a": {"xX": 3},"b": {"z": 0}}""",
"""{"a": {"XX": 3},"b": {"z": 0}}""",
"""{"a": {"xx": 4},"b": {"z": 0}}"""))
var df = sqlContext.read.json(rdd)
は、次のスキーマを生成する:
root
|-- a: struct (nullable = true)
| |-- XX: long (nullable = true)
| |-- xX: long (nullable = true)
| |-- xx: long (nullable = true)
|-- b: struct (nullable = true)
| |-- z: long (nullable = true)
その後、私はこれを行うことができます。 https://stackoverflow.com/a/39943812/1068385:
import org.apache.spark.sql.functions._
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX"))
df = df
.withColumn("a_xx",
coalesce(overlappingNames:_*))
.dropNestedColumn("a.xX")
.dropNestedColumn("a.XX")
.dropNestedColumn("a.xx")
は(dropNestedColumn
は、この回答から借りています。私は基本的に逆の操作を探していますその上)
そしてスキーマは次のようになります。
root
|-- a: struct (nullable = false)
|-- b: struct (nullable = true)
| |-- z: long (nullable = true)
|-- a_xx: long (nullable = true)
明らかにそれは交換する(または追加しません)a.xx
ではなく、それは、ルートレベルで新しいフィールドa_xx
を追加します。
私の代わりにこれを行うことができるようにしたい:それはこのスキーマにつながるようにするため
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX"))
df = df
.withNestedColumn("a.xx",
coalesce(overlappingNames:_*))
.dropNestedColumn("a.xX")
.dropNestedColumn("a.XX")
を:
root
|-- a: struct (nullable = false)
| |-- xx: long (nullable = true)
|-- b: struct (nullable = true)
| |-- z: long (nullable = true)
どのように私はそれを達成することができますか?
実際の目標は、入力JSONの列名で大文字と小文字を区別しないことです。最後のステップは簡単です:すべての重複する列名を収集し、それぞれに合体を適用します。
object DataFrameUtils {
private def nullableCol(parentCol: Column, c: Column): Column = {
when(parentCol.isNotNull, c)
}
private def nullableCol(c: Column): Column = {
nullableCol(c, c)
}
private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = {
splitted
.foldRight(newCol) {
case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName))
}
}
private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = {
colType match {
case colType: StructType if splitted.nonEmpty => {
var modifiedFields: Seq[(String, Column)] = colType.fields
.map(f => {
var curCol = col.getField(f.name)
if (f.name == splitted.head) {
curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol)
}
(f.name, curCol as f.name)
})
if (!modifiedFields.exists(_._1 == splitted.head)) {
modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head)
}
var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*)
if (nullable) {
modifiedStruct = nullableCol(col, modifiedStruct)
}
modifiedStruct
}
case _ => createNestedStructs(splitted, newCol)
}
}
private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = {
if (newColName.contains('.')) {
var splitted = newColName.split('.')
val modifiedOrAdded: (String, Column) = df.schema.fields
.find(_.name == splitted.head)
.map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol)))
.getOrElse {
(splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head)
}
df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2)
} else {
// Top level addition, use spark method as-is
df.withColumn(newColName, newCol)
}
}
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Add nested field to DataFrame
*
* @param newColName Dot-separated nested field name
* @param newCol New column value
*/
def withNestedColumn(newColName: String, newCol: Column): DataFrame = {
DataFrameUtils.addNestedColumn(df, newColName, newCol)
}
}
}
はそれを改善すること自由に感じ:
はあなたが解決策を得るのですか? –
@ShankarKoirala:スパークではありません。ハイブでは、私が望むものを達成するために、COALESCEを使用することは自明でした。 – Arvidaa