1
Spark UDAFを記述したいのですが、その列の型はScala Numericが定義されているものになります。私はインターネットで検索しましたが、DoubleType、LongTypeなどの具体的な型の例しか見つかりませんでした。これは可能ではありませんか?しかし、そのUDAFを他の数値でどうやって使うのですか?Spark UDAF - 入力タイプとしてジェネリックを使用していますか?
Spark UDAFを記述したいのですが、その列の型はScala Numericが定義されているものになります。私はインターネットで検索しましたが、DoubleType、LongTypeなどの具体的な型の例しか見つかりませんでした。これは可能ではありませんか?しかし、そのUDAFを他の数値でどうやって使うのですか?Spark UDAF - 入力タイプとしてジェネリックを使用していますか?
簡潔にするため、カスタムsum
を定義するとします。
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import scala.reflect.runtime.universe._
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
case class MySum [T : TypeTag](implicit n: Numeric[T])
extends UserDefinedAggregateFunction {
val dt = schemaFor[T].dataType
def inputSchema = new StructType().add("x", dt)
def bufferSchema = new StructType().add("x", dt)
def dataType = dt
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, n.zero)
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, n.plus(buffer.getAs[T](0), input.getAs[T](0)))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, n.plus(buffer1.getAs[T](0), buffer2.getAs[T](0)))
}
def evaluate(buffer: Row) = buffer.getAs[T](0)
}
我々は特定の種類を扱うインスタンスを作成することができ、上記のように定義した関数で:
をval sumOfLong = MySum[Long]
spark.range(10).select(sumOfLong($"id")).show
+---------+
|mysum(id)|
+---------+
| 45|
+---------+
をあなたは入力タイプにTypeTag
を提供し、スキーマを定義するためにScalaのリフレクションを使用しています注:
定義する必要がある組み込み集計関数と同じ柔軟性を得るにはあなた自身のAggregateFunction
、ImperativeAggregate
またはDeclarativeAggregate
などです。これは可能ですが、内部APIです。