2016-08-16 16 views
3

のは、私は次のようDataFrameを持っているとしましょう:PySparkのDataFrameの列配列に集計しますか?

[Row(user='bob', values=[0.5, 0.3, 0.2]), 
Row(user='bob', values=[0.1, 0.3, 0.6]), 
Row(user='bob', values=[0.8, 0.1, 0.1])] 

私はusergroupByたいと平均がこのような配列valuesの各インデックスの上に取られavg(values)のようなものだろう:

[Row(user='bob', avgerages=[0.466667, 0.233333, 0.3])] 

をPySparkでこれをどうやって行うことができますか?

答えて

6

各インデックスの配列を展開し、平均を計算することができます。

Pythonの

from pyspark.sql.functions import array, avg, col 

n = len(df.select("values").first()[0]) 

df.groupBy("user").agg(
    array(*[avg(col("values")[i]) for i in range(n)]).alias("averages") 
) 

Scalaの

import spark.implicits._ 
import org.apache.spark.functions.{avg, size} 

val df = Seq(
    ("bob", Seq(0.5, 0.3, 0.2)), 
    ("bob", Seq(0.1, 0.3, 0.6)) 
).toDF("user", "values") 

val n = df.select(size($"values")).as[Int].first 
val values = (0 to n).map(i => $"values"(i)) 

df.select($"user" +: values: _*).groupBy($"user").avg() 
+0

*このケースで何をしますか?また、Pandasのように、各グループをユーザー定義の関数に渡してそこで操作を行う方法がありますか?ありがとう。 –

+0

'*'は標準のPython引数のアンパックです。いいえ、PythonはUDAFをサポートしていません。 RDDを直接使用することも、JVMを定義することもできます。 – zero323

+0

ありがとう! RDDはここで理にかなっていると思う。 –

関連する問題