2017-07-11 38 views
2

Scala Sparkを使用すると、集計された列を丸めるために型付きのDataset APIを使用するにはどうすればよいですか?Spark Datasetで列を丸める方法はありますか?

また、groupby操作でデータセットのタイプを保持するにはどうすればよいですか?

これは私が現在持っているものです。

case class MyRow(
    k1: String, 
    k2: String, 
    c1: Double, 
    c2: Double 
) 

def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = { 
import org.apache.spark.sql.expressions.scalalang.typed._ 
ds.groupByKey(row => (row.k1, row.k2)) 
    .agg(
    avg(_.c1), 
    avg(_.c2) 
) 
    .map(r => MyRow(r._1._1, r._1._2, r._2, r._3)) 
} 
  1. 私はround(avg(_.c1))avg(_.c1)を交換する場合、私は型エラーを取得します。値を丸める正しい方法は何ですか?
  2. .map(...)ラインが気に入らない - データセットのタイプを保持するためのよりエレガントな方法がありますか?

ありがとう! aggタイプTypedColumn[IN, OUT]の集計関数を期待し、ラウンドがColumn(データフレームで使用するのに適した)を提供するためroundを使用

答えて

2

は、実際、型エラーで失敗します。あなたがここに必要なもの

org.apache.spark.sql.expressions.scalalang.typed._に供給されていない丸め平均集約関数、である - しかし、あなたはかなり簡単に平均的な集約を行い、クラスを拡張して1を自分で作成することができます。

// Extend TypedAverage - round the result before returning it 
class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) { 
    override def finish(reduction: (Double, Long)): Double = math.round(super.finish(reduction)) 
} 

// A nice wrapper to create the TypedRoundAverage for a given function 
def roundAvg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedRoundAverage(f).toColumn 

// Now you can use "roundAvg" instead of "round" 
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = { 
    ds.groupByKey(row => (row.k1, row.k2)) 
    .agg(
     roundAvg(_.c1), 
     roundAvg(_.c2) 
    ) 
    .map { case ((k1, k2), c1, c2) => MyRow(k1, k2, c1, c2) } // just a nicer way to put it 
} 

I group-byが必然的にタプルを返すので、mapの操作を取り除く方法は見当たりませんが、パターンマッチングを使ってよりうまくいくようにすることができます。

2

答えは受け入れられますが、より一般的ですラウンドを使用することもできます。 .as[T](avgへの型定義も必要になります)を使用して四捨五入した後に列型を作成するだけです。

.agg(
    // Alternative ways to define a type to avg 
    round(avg((r: MyRow) => r.c1)).as[Double], 
    round(avg[MyRow](_.c2)).as[Double] 
) 
関連する問題