2017-04-06 6 views
1

Spark UDAFを記述したいのですが、その列の型はScala Numericが定義されているものになります。私はインターネットで検索しましたが、DoubleType、LongTypeなどの具体的な型の例しか見つかりませんでした。これは可能ではありませんか?しかし、そのUDAFを他の数値でどうやって使うのですか?Spark UDAF - 入力タイプとしてジェネリックを使用していますか?

答えて

5

簡潔にするため、カスタムsumを定義するとします。

import org.apache.spark.sql.expressions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 
import scala.reflect.runtime.universe._ 
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 

case class MySum [T : TypeTag](implicit n: Numeric[T]) 
    extends UserDefinedAggregateFunction { 

    val dt = schemaFor[T].dataType 
    def inputSchema = new StructType().add("x", dt) 
    def bufferSchema = new StructType().add("x", dt) 

    def dataType = dt 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, n.zero) 
    def update(buffer: MutableAggregationBuffer, input: Row) = { 
    if (!input.isNullAt(0)) 
     buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0))) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1.update(0, n.plus(buffer1.getAs[T](0), buffer2.getAs[T](0)))  
    } 

    def evaluate(buffer: Row) = buffer.getAs[T](0) 
} 

我々は特定の種類を扱うインスタンスを作成することができ、上記のように定義した関数で:

val sumOfLong = MySum[Long] 
spark.range(10).select(sumOfLong($"id")).show 
+---------+ 
|mysum(id)| 
+---------+ 
|  45| 
+---------+ 

をあなたは入力タイプにTypeTagを提供し、スキーマを定義するためにScalaのリフレクションを使用しています

定義する必要がある組み込み集計関数と同じ柔軟性を得るにはあなた自身のAggregateFunctionImperativeAggregateまたはDeclarativeAggregateなどです。これは可能ですが、内部APIです。

関連する問題