1

文字列用にカスタムAggregator[]を作成しました。カスタムスパークアグリゲータを複数の列に適用する(Spark 2.0)

DataFrameのすべての列に適用したいですが、すべての列は文字列ですが、列番号は任意です。

私は正しい表現を書いて立ち往生しています。私はこのようなものを書こうと思います:

df.agg(df.columns.map(c => myagg(df(c))) : _*) 

これは明らかに様々なインターフェイスが与えられている間違いです。

私はRelationalGroupedDataset.agg(expr: Column, exprs: Column*)コードを見ましたが、私は表現操作に慣れていません。

+3

アグリゲータコードを表示してください。あなたがしようとしていることを説明してください。 –

+0

@AssafMendelson、実際には、様々なデータ型の様々な統計のためのカスタムアグリゲーターを多数用意する予定です。クラスターShortestLongestAggregator()はAggregator [String、(String、String)、(String、String)]を継承しています。今のところ、任意のデータフレームのすべての列に対して(最短、最長の)すべての列を保持したいと考えています(与えられた文字列のみ)。 – mathieu

答えて

5

UserDefinedAggregateFunctionsとは対照的に、Aggregtorsは完全な /値を想定しています。

スニペットで使用できるAggregatorを使用する場合は、列名でパラメータ化し、値タイプとしてを使用する必要があります。

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.{Encoder, Encoders, Row} 

case class Max(col: String) 
    extends Aggregator[Row, Int, Int] with Serializable { 

    def zero = Int.MinValue 
    def reduce(acc: Int, x: Row) = 
    Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero)) 

    def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2) 
    def finish(acc: Int) = acc 

    def bufferEncoder: Encoder[Int] = Encoders.scalaInt 
    def outputEncoder: Encoder[Int] = Encoders.scalaInt 
} 

使用例:静的Dataset<Row>よりDatasetsを入力したと組み合わせると

val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z") 

@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)")) 

df.agg(exprs.head, exprs.tail: _*) 
+------+------+------+ 
|max(x)|max(y)|max(z)| 
+------+------+------+ 
|  4|  5|  3| 
+------+------+------+ 

間違いなくAggregatorsは、はるかに理にかなって。

ご要望に応じて、Seq[_]アキュムレータを使用し、(レコード)を1回の処理で1回処理することで、複数の列を1回のパスで集約することもできます。

関連する問題