2017-01-18 8 views
6

ネストされたレベルの構造体にフィールドを追加または置換するにはどうすればよいですか?Spark DataFrameにネストされた列を追加する

この入力:

val rdd = sc.parallelize(Seq(
    """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""", 
    """{"a": {"xX": 3},"b": {"z": 0}}""", 
    """{"a": {"XX": 3},"b": {"z": 0}}""", 
    """{"a": {"xx": 4},"b": {"z": 0}}""")) 
var df = sqlContext.read.json(rdd) 

は、次のスキーマを生成する:

root 
|-- a: struct (nullable = true) 
| |-- XX: long (nullable = true) 
| |-- xX: long (nullable = true) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

その後、私はこれを行うことができます。 https://stackoverflow.com/a/39943812/1068385

import org.apache.spark.sql.functions._ 
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withColumn("a_xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 
    .dropNestedColumn("a.xx") 

は(dropNestedColumnは、この回答から借りています。私は基本的に逆の操作を探していますその上)

そしてスキーマは次のようになります。

root 
|-- a: struct (nullable = false) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 
|-- a_xx: long (nullable = true) 

明らかにそれは交換する(または追加しません)a.xxではなく、それは、ルートレベルで新しいフィールドa_xxを追加します。

私の代わりにこれを行うことができるようにしたい:それはこのスキーマにつながるようにするため

val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withNestedColumn("a.xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 

を:

root 
|-- a: struct (nullable = false) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

どのように私はそれを達成することができますか?

実際の目標は、入力JSONの列名で大文字と小文字を区別しないことです。最後のステップは簡単です:すべての重複する列名を収集し、それぞれに合体を適用します。

object DataFrameUtils { 
    private def nullableCol(parentCol: Column, c: Column): Column = { 
    when(parentCol.isNotNull, c) 
    } 

    private def nullableCol(c: Column): Column = { 
    nullableCol(c, c) 
    } 

    private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = { 
    splitted 
     .foldRight(newCol) { 
     case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName)) 
     } 
    } 

    private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = { 
    colType match { 
     case colType: StructType if splitted.nonEmpty => { 
     var modifiedFields: Seq[(String, Column)] = colType.fields 
      .map(f => { 
      var curCol = col.getField(f.name) 
      if (f.name == splitted.head) { 
       curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol) 
      } 
      (f.name, curCol as f.name) 
      }) 

     if (!modifiedFields.exists(_._1 == splitted.head)) { 
      modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head) 
     } 

     var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*) 
     if (nullable) { 
      modifiedStruct = nullableCol(col, modifiedStruct) 
     } 
     modifiedStruct 
     } 
     case _ => createNestedStructs(splitted, newCol) 
    } 
    } 

    private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = { 
    if (newColName.contains('.')) { 
     var splitted = newColName.split('.') 

     val modifiedOrAdded: (String, Column) = df.schema.fields 
     .find(_.name == splitted.head) 
     .map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol))) 
     .getOrElse { 
      (splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head) 
     } 

     df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2) 

    } else { 
     // Top level addition, use spark method as-is 
     df.withColumn(newColName, newCol) 
    } 
    } 

    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Add nested field to DataFrame 
     * 
     * @param newColName Dot-separated nested field name 
     * @param newCol New column value 
     */ 
    def withNestedColumn(newColName: String, newCol: Column): DataFrame = { 
     DataFrameUtils.addNestedColumn(df, newColName, newCol) 
    } 
    } 
} 

はそれを改善すること自由に感じ:

+0

はあなたが解決策を得るのですか? –

+0

@ShankarKoirala:スパークではありません。ハイブでは、私が望むものを達成するために、COALESCEを使用することは自明でした。 – Arvidaa

答えて

1

ここにそれができるよう、エレガントやとして効率的ではありませんかもしれませんが、私が思いついたものです。

val data = spark.sparkContext.parallelize(List("""{ "a1": 1, "a3": { "b1": 3, "b2": { "c1": 5, "c2": 6 } } }""")) 
val df: DataFrame = spark.read.json(data) 

val df2 = df.withNestedColumn("a3.b2.c3.d1", $"a3.b2") 

を生成する必要があります

assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString) 
+0

ありがとうございます。私は来週にそれを確認し、それが承認された回答として動作するかどうかを確認します。 – Arvidaa

+0

@Michel Lemay質問の場合はうまくいきます。ありがとう。私は構造体のネストされた配列にそれを適用しようとしていますが失敗します、それは私の実際の火花の知識のために少し遠すぎます...あなたは私を助けることができますか? – Gab

+0

実際、私たちが必要な機能ではないので、将来の改善のために残しました。これを現在のコードを使ってサポートするには、 'case _'を修正し、ネストされた構造体の配列をサポートする必要があります。ネストされた単純型は、構造体にも昇格させる必要があります。また、newColで配列をサポートし、配列の配列の要素数を多分扱う必要があります。 –

関連する問題