2017-01-07 12 views
0

私の列の1つがオブジェクトの配列であるSpark DataFrameがあります。私はその配列をフィルタリングする操作をしたいと思います。下の私の例では、私は子供がいる親を持っています。私は大人の子供だけを取得したいと思います。配列の要素にフィルタを適用する最も簡潔な方法は何ですか?Sparkの列

import spark.implicits._ 

case class Child(name: String, age: Int) 
case class Parent(name: String, children: Array[Child]) 

val rawData = Seq(Parent("Mom", Array(Child("Jane", 9))), Parent("Dad", Array(Child("Hubert", 28), Child("David", 27), Child("Jim", 25)))) 
val data = spark.createDataFrame(rawData) 

私は来ることができた最も近いです:

val adultChildren = udf((children: mutable.WrappedArray[Child]) => { 
    val rowArray = children.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]] 
    val ret = rowArray.filter(c => c.getAs[Int]("age") > 18) 
    ret.asInstanceOf[mutable.WrappedArray[Child]] 
}) 
data.select(adultChildren($"children")).show() 

これはやや面倒です。 Sparkはオブジェクトを直列化する時間が少なくて済みますが、それは冗長です。

もっと簡潔な方法がありますか?

+0

スパークのどのバージョンですか? –

+0

私は2.0.2を使用しています –

答えて

2

あなたがデータセットを使用することができるなら、それは本当に簡単になる:

data.map(_.children.filter(_.age > 18).toList) 

あなたがDataFramesに従うなら:

data.select($"name", explode($"children").as("child")) 
    .where($"child.age" > 18) 
    .groupBy($"name").agg(collect_list($"child")) 
+0

これらのアプローチのいずれかがパフォーマンスに及ぼす影響の考え方はありますか?より効率的になりそうですか? –

0

1つの改良は、関数内で定型をカプセル化することです。

import scala.reflect.runtime.universe._ 
def arrayFilterUDF[T: TypeTag](f: (GenericRowWithSchema) => Boolean) = udf((a: mutable.WrappedArray[T]) => { 
    val rowArray = a.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]] 
    rowArray.filter(f).asInstanceOf[mutable.WrappedArray[T]] 
}) 

これは、あなたが書くことができます:

val adultChildren = arrayFilterUDF[Child](_.getAs[Int]("age") > 18) 
data.select(adultChildren($"children")).show()  
関連する問題