UserDefinedAggregateFunctions
とは対照的に、Aggregtors
は完全な /値を想定しています。
スニペットで使用できるAggregator
を使用する場合は、列名でパラメータ化し、値タイプとしてを使用する必要があります。
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
使用例:静的Dataset<Row>
よりDatasets
を入力したと組み合わせると
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
間違いなくAggregators
は、はるかに理にかなって。
ご要望に応じて、Seq[_]
アキュムレータを使用し、(レコード)を1回の処理で1回処理することで、複数の列を1回のパスで集約することもできます。
アグリゲータコードを表示してください。あなたがしようとしていることを説明してください。 –
@AssafMendelson、実際には、様々なデータ型の様々な統計のためのカスタムアグリゲーターを多数用意する予定です。クラスターShortestLongestAggregator()はAggregator [String、(String、String)、(String、String)]を継承しています。今のところ、任意のデータフレームのすべての列に対して(最短、最長の)すべての列を保持したいと考えています(与えられた文字列のみ)。 – mathieu