2017-08-03 10 views
0

私は2つのデータフレームを持っているとしましょう。Spark SQL - 集計コレクション?

DF1は、さまざまな行の列Aに値{3、4、5}を持つことができます。

DF2は、さまざまな行の列Aに値{4,5,6}を持つことがあります。

distinct_set(A)を使用して、これらのすべての行が同じグループに属すると仮定して、これらを一連の別個の要素に集約できます。

この時点で、結果のデータフレームにセットがあります。そのセットを別のセットに集約する方法はありますか?基本的に、最初の集計の結果として2つのデータフレームがある場合、その結果を集計できるようにしたいと考えています。

+1

入力と予想出力の例を提供する必要があります。これはexplodeを使って別のcollect_setを使うか、UDAFを使って行うことができるようです –

答えて

0

explodeとcollect_setはこれを解決することができましたが、カスタムアグリゲータを作成してセット自体をマージするだけで意味がありました。それらの基礎となる構造はWrappedArrayです。

case class SetMergeUDAF() extends UserDefinedAggregateFunction { 

    def deterministic: Boolean = false 

    def inputSchema: StructType = StructType(StructField("input", ArrayType(LongType)) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(LongType)) :: Nil) 

    def dataType: DataType = ArrayType(LongType) 

    def initialize(buf: MutableAggregationBuffer): Unit = { 
    buf(0) = mutable.WrappedArray.empty[LongType] 
    } 

    def update(buf: MutableAggregationBuffer, input: Row): Unit = { 
    if (!input.isNullAt(0)) { 
     val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType] 
     val x = result ++ (buf.getAs[mutable.WrappedArray[Long]](0).toSet ++ input.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long] 
     buf(0) = x 
    } 
    } 

    def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = { 
    val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType] 
    val x = result ++ (buf1.getAs[mutable.WrappedArray[Long]](0).toSet ++ buf2.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long] 
    buf1(0) = x 
    } 

    def evaluate(buf: Row): Any = buf.getAs[mutable.WrappedArray[LongType]](0) 
}