2017-03-06 7 views
7

値を最大にしてすべての列を保持する方法(グループあたりの最大レコード数)?以下のデータフレームを考える

+----+-----+---+-----+ 
| uid| k| v|count| 
+----+-----+---+-----+ 
| a|pref1| b| 168| 
| a|pref3| h| 168| 
| a|pref3| t| 63| 
| a|pref3| k| 84| 
| a|pref1| e| 84| 
| a|pref2| z| 105| 
+----+-----+---+-----+ 

がどのように私は、uidから最大値を取得kが、vを含めることができますか?

+----+-----+---+----------+ 
| uid| k| v|max(count)| 
+----+-----+---+----------+ 
| a|pref1| b|  168| 
| a|pref3| h|  168| 
| a|pref2| z|  105| 
+----+-----+---+----------+ 

私はこのような何かを行うことができますが、それはコラム「V」ドロップします:あなたはウィンドウ関数を使用することができます

df.groupBy("uid", "k").max("count") 

答えて

6

これは完璧ですウィンドウ演算子の例(over関数を使用)またはjoin

すでにウィンドウの使い方を理解しているので、私はjoinに専念しています。

scala> val inventory = Seq(
    | ("a", "pref1", "b", 168), 
    | ("a", "pref3", "h", 168), 
    | ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count") 
inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields] 

scala> val maxCount = inventory.groupBy("uid", "k").max("count") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+----------+ 
|uid| k|max(count)| 
+---+-----+----------+ 
| a|pref3|  168| 
| a|pref1|  168| 
+---+-----+----------+ 

scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+---+ 
|uid| k|max| 
+---+-----+---+ 
| a|pref3|168| 
| a|pref1|168| 
+---+-----+---+ 

scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show 
+---+-----+---+---+-----+ 
|uid| k|max| v|count| 
+---+-----+---+---+-----+ 
| a|pref3|168| h| 168| 
| a|pref1|168| b| 168| 
+---+-----+---+---+-----+ 
4

を:

from pyspark.sql.functions import max as max_ 
from pyspark.sql.window import Window 

w = Window.partitionBy("uid", "k") 

df.withColumn("max_count", max_("count").over(w)) 
+0

ほとんどの場合、最大値の列が追加されますが、すべての行が保持されます。 – jfgosselin

2

ここに私が思いついた最善の解決策ですこれまでのところ:

val w = Window.partitionBy("uid","k").orderBy(col("count").desc) 

df.withColumn("rank", dense_rank().over(w)).select("uid", "k","v","count").where("rank == 1").show 
関連する問題