2017-05-17 4 views
0

私たちは、このようないくつかのコードを持っています。dfはデータフレームであるSpark KeyValueGroupedDatasetのaggメソッドの使用方法は?

// think of class A as a table with two columns 
case class A(property1: String, property2: Long) 

// class B adds a column to class A 
case class B(property1: String, property2: Long, property3: String) 

df.as[A].map[B](a => { 
     val my_udf = // some code here which creates a user defined function 
     new B(a.property1, a.property2, my_udf(a)) 
    }) 

を。次の我々はスパークでこれを行うことができますどのように我々はこの

select property1, property3, avg(property2), count(*) from B group by property1, property3 

ようなSQLで作成していたタイプC

// we want to group objects of type B by properties 1 and 3 and compute the average of property2 and also want to store a count 
case class C(property1: String, property3: String, average: Long, count: Long) 

のデータセットを作成したいですか? KeyValueGroupedDataSetと一緒にaggというgroupByKeyを使用しようとしていますが、動作させることはできません。 (groupBy.aggを使用)を使用すると、タイプCのデータセットがds_cと呼ばれている場合は、あなたが行うことができますagg

答えて

2

を使用する方法を見つけ出すことはできません:

ds_c.groupBy("property1", "property3").agg(count($"property2").as("count"), 
              avg($"property2").as("mean")) 
+0

いくつかの問題、私は任意の数が表示されていない1および平均キーワード。私は、このコードが機能するように私たちのデータセットの列をどのように名付けますか? – morpheus

+0

これらの関数をスコープで取得するには、 'import org.apache.spark.sql.functions._'が必要です。 – Garren

+0

すべての一般的な列関数については、[ここ](https://spark.apache.org/docs/2.0.2/api/scala/#org.apache.spark.sql.functions$)をチェックすることができます。データフレームの名前を変更する場合は、 'withColumnRenamed(old name、new name)'を使用できます。 – Psidom

関連する問題