2016-03-30 11 views
2

ユーザー指定の列名とユーザー指定の集計マップを取得するカスタムグループ化と集計関数を作成します。 私は、列名と集計マップの先頭がであるかどうかわかりません。以下のような関数を書いてみたい。しかし私はScalaには新しく、解決できません。Scala-Sparkパラメータ値を持つgroupbyとaggを動的に呼び出します。

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated.show() 
} 

val listOfStrings = List("A", "B", "C") 
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings) 

私はこれをどのように行うことができますようにそれを呼び出すようにしたいですか? 誰も助けてください。

答えて

4

あなたのコードは、ほぼ正しいです - 二つの問題で:

  1. あなたの関数の戻り値の型はDataFrameですが、最後の行はUnitを返す、aggregated.show()です。最初の列、その後、残りの列を: - ので、あなたが一致する引数を渡す必要がありcol1: String, cols: String*次のような引数を期待DataFrame.groupBy

  2. aggregated自体を返すためにshowへの呼び出しを削除するか、または単にaggの結果を返すすぐ

    df.groupBy(cols.head, cols.tail: _*)

を要するに、あなたの関数は次のようになります:引数のリストとして、次のようにそれを行うことができます

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols.head, cols.tail: _*) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated 
} 

あるいは、同様の短いバージョン:

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame = { 
    df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun) 
} 

あなたはを行う場合は、あなたの関数内でshowを呼び出したい:そのため

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols.head, cols.tail: _*) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated.show() 
    aggregated 
} 
+0

おかげで多くのことを。はい。 'df.groupBy(cols.head、cols.tail:_ *)'これは主に私が考えることができなかったものです。 2番目のバージョンは私が必要なものです。残りはローカルテスト用です。 – NehaM

+0

私の場合、val key = List( "key1"、 "key2")と同じように機能しました。val grouped = df.groupBy(cols.head、cols:_ *) – Nitin

関連する問題