2016-09-28 5 views
12

私は平らにしようとしているデータフレームを持っています。プロセスの一環として、私はそれを分解したいので、配列の列がある場合、配列の各値は別々の行を作成するために使用されます。例えば、スパークSQL NULL値を失うことなく爆発する方法

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

なるべきこれは私のコード

private DataFrame explodeDataFrame(DataFrame df) { 
    DataFrame resultDf = df; 
    for (StructField field : df.schema().fields()) { 
     if (field.dataType() instanceof ArrayType) { 
      resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); 
      resultDf.show(); 
     } 
    } 
    return resultDf; 
} 

ある問題は私のデータでは、配列の列の一部がヌルを持っているということです。その場合、行全体が削除されます。したがって、このデータフレーム:

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 
2 | Lucy | null 

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

代わりの

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 
2 | Lucy | null 

私はヌル行を失わないように、どのように私は私の配列を爆発することができなりましたか?

私は、Spark 1.5.2およびJavaを使用しています8

答えて

20

あなたはexplode_outer機能を使用することができます2.2+

スパーク:

import org.apache.spark.sql.functions.explode_outer 

df.withColumn("likes", explode_outer($"likes")).show 

// +---+----+--------+ 
// | id|name| likes| 
// +---+----+--------+ 
// | 1|Luke|baseball| 
// | 1|Luke| soccer| 
// | 2|Lucy| null| 
// +---+----+--------+ 

スパーク< = 2.1

をScalaではJavaと同等のものが必要ですほとんど同じです(個々の関数をインポートするにはimport staticを使用してください)。

import org.apache.spark.sql.functions.{array, col, explode, lit, when} 

val df = Seq(
    (1, "Luke", Some(Array("baseball", "soccer"))), 
    (2, "Lucy", None) 
).toDF("id", "name", "likes") 

df.withColumn("likes", explode(
    when(col("likes").isNotNull, col("likes")) 
    // If null explode an array<string> with a single null 
    .otherwise(array(lit(null).cast("string"))))) 

ここでの考え方は、所望の種類のarray(NULL)NULLを置き換えるために、基本的です。

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") 

val st = StructType(Seq(
    StructField("_1", IntegerType, false), StructField("_2", StringType, true) 
)) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast(st))))) 

または

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

:複合型(別名structs)のために、あなたは完全なスキーマを提供する必要が

配列ColumnfalsecontainsNullセットで作成されている場合、あなたはすべきこれを最初に変更します(Spark 2.1でテスト済み):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 
+0

偉大に見える、ありがとうございました!私はフォローアップの質問があります:もし私の列の型がStructTypeならどうですか?私はcast(new StructType())を使ってみましたが、データ型の不一致があります:THENとELSE式はすべて共通の型になります。すべての列型に適合します。 – alexgbelov

+0

また、列の型を取得するために、DataFrame.dtypes()を使用しています。列の型を取得するより良い方法はありますか? – alexgbelov

+1

a)すべてのフィールドに完全スキーマを提供する必要があります。 b) 'dtypes'または' schema'です。 – zero323

0

受け入れられた答えをフォローアップすると、配列要素が複雑な型の場合は、手で定義するのが難しい場合があります(例:大きな構造体の場合)。

が自動的にそれを行うには、私は次のヘルパーメソッドを書いた:

関連する問題