2017-01-24 9 views
2

からNULL値をドロップ:スパークDATAFRAME - データフレームを考えるコラム

val df = sc.parallelize(Seq(("foo", ArrayBuffer(null,"bar",null)), ("bar", ArrayBuffer("one","two",null)))).toDF("key", "value") 
    df.show 

    +---+--------------------------+ 
    |key|      value| 
    +---+--------------------------+ 
    |foo|ArrayBuffer(null,bar,null)| 
    |bar|ArrayBuffer(one, two,null)| 
    +---+--------------------------+ 

を私がコラムvalueからnullをドロップしたいと思います。削除後のデータフレームは次のようになります。

+---+--------------------------+ 
    |key|      value| 
    +---+--------------------------+ 
    |foo|ArrayBuffer(bar)   | 
    |bar|ArrayBuffer(one, two)  | 
    +---+--------------------------+ 

10x

答えて

3

ここにはUDFが必要です。 flatMapと例えば:

map外部 OptionNULL列ハンドル
val filterOutNull = udf((xs: Seq[String]) => 
    Option(xs).map(_.flatMap(Option(_)))) 

df.withColumn("value", filterOutNull($"value")) 

Option(null: Seq[String]).map(identity) 
Option[Seq[String]] = None 
Option(Seq("foo", null, "bar")).map(identity) 
Option[Seq[String]] = Some(List(foo, null, bar)) 

をし、入力されたとき、我々はNPEで失敗しない保証しますNULL/null地図でpingの

NULL -> null -> None -> None -> NULL 
nullはScalaの null

NULLはSQL NULLです。

内部flatMapは効果的nullsをフィルタリングOptionsのシーケンスを平坦化:

Seq("foo", null, "bar").flatMap(Option(_)) 
Seq[String] = List(foo, bar) 

より急務と同等はこのようなものが考えられます。

val imperativeFilterOutNull = udf((xs: Seq[String]) => 
    if (xs == null) xs 
    else for { 
    x <- xs 
    if x != null 
    } yield x) 
2

オプション1:UDFを使用した:

val filterNull = udf((arr : Seq[String]) => arr.filter((x: String) => x != null)) 
df.withColumn("value", filterNull($"value")).show() 

オプション2:これはあまり効率的であることなしUDF

df.withColumn("value", explode($"value")).filter($"value".isNotNull).groupBy("key").agg(collect_list($"value")).show() 

注...

+0

クリア。ありがとう! – Toren

関連する問題