2016-06-21 6 views
3

Spark 1.5.0を使用しています。私は、次の列を持つスパークデータフレームを持っている:私がやりたい何spark dataframe列のグループの最上位行をグループ化、並べ替え、選択する

| user_id | description | fName | weight | 

は、ユーザごと(データ型ダブルのある列の重みの値に基づいて)、トップ10と下10行を選択することです。 SparkSQLやデータフレーム操作を使用してその作業を行うにはどうすればよいですか?

たとえば、簡単にするために、ユーザーあたりの上位2行(重量に基づく)のみを選択しています。絶対重量の値でo/pをソートしたいと思います。ここで

u1 desc1 f1 -0.20 
u1 desc1 f1 +0.20 
u2 desc1 f1 0.80 
u2 desc1 f1 -0.60 
u1 desc1 f1 1.10 
u1 desc1 f1 6.40 
u2 desc1 f1 0.05 
u1 desc1 f1 -3.20 
u2 desc1 f1 0.50 
u2 desc1 f1 -0.70 
u2 desc1 f1 -0.80 

希望のO/Pである:

u1 desc1 f1 6.40 
u1 desc1 f1 -3.20 
u1 desc1 f1 1.10 
u1 desc1 f1 -0.20 
u2 desc1 f1 0.80 
u2 desc1 f1 -0.80 
u2 desc1 f1 -0.70 
u2 desc1 f1 0.50 

答えて

1

あなたがrow_numberとウィンドウ関数を使用することができます。

import org.apache.spark.sql.functions.row_number 
import org.apache.spark.sql.expressions.Window 

val w = Window.partitionBy($"user_id") 
val rankAsc = row_number().over(w.orderBy($"weight")).alias("rank_asc") 
val rankDesc = row_number().over(w.orderBy($"weight".desc)).alias("rank_desc") 

df.select($"*", rankAsc, rankDesc).filter($"rank_asc" <= 2 || $"rank_desc" <= 2) 

スパークであなたの代わりにrow_numberrowNumberを使用することができます1.5.0。

+0

私はrow_numberがSpark 1.5に含まれていないと思います。このエラーが発生しました:scala> import org.apache.spark.sql.function_row_number :29:エラー:値row_numberはオブジェクトorg.apache.spark.sql.functionsのメンバーではありません import org.apache.spark .sql.functions.row_number – user3803714

+0

あなたはそうですが、関数名が異なっていても解決策は同じです( 'rowNumber')。 – zero323

+0

別々に計算された行番号はどのようにデータセットdfレコードに関連付けられていますか? –

関連する問題