2016-08-07 26 views
6

Spark DataFrame dfが与えられた場合、特定の数値列'values'の最大値を求め、その値に達した行を取得します。私はもちろん、これを行うことができます。Spark DataFramesのargmax:最大値の行を取得する方法

# it doesn't matter if I use scala or python, 
# since I hope I get this done with DataFrame API 
import pyspark.sql.functions as F 
max_value = df.select(F.max('values')).collect()[0][0] 
df.filter(df.values == max_value).show() 

をしかし、それはdfを2回通過を必要とするため、これは非効率的です。

pandas.Series/DataFramenumpy.arrayは(ワンパスで)効率的にこれを行うargmax/idxmax方法を有しています。標準的なPython(組み込み関数maxはキーパラメータを受け取りますので、最も高い値のインデックスを見つけるために使用できます)。

Sparkの正しいアプローチは何ですか?最大値が得られる行をすべて取得するか、それらの行の任意の(空ではない!)部分集合を取得するかどうかは気にしないことに注意してください。

+0

をクロス言語であり、任意のデータ上で動作することができますは良い解決策はありません。 – zero323

+0

@ zero323 DataFrame APIのRDDコードをScalaに変換し、Catalystが動作するために適切なメタデータを追加することで、以下の答えでRDDコードをラップすることはなぜ不可能ですか? – max

+0

これは可能ですが、ScalaまたはPython_を使用すると_itは問題ではないという前提をはっきりと分かりません.SQLだけで注文可能なデータ型でも行うことができますが、これは特殊なケースではありません。 – zero323

答えて

10

スキーマis Orderableあなたは、単純な集計を使用することができます(スキーマは唯一のアトミック/アトミックの配列/再帰的に注文可能な構造体が含まれている)場合:

Pythonの

df.select(F.max(
    F.struct("values", *(x for x in df.columns if x != "values")) 
)).first() 

スカラ

df.select(max(struct(
    $"values" +: df.columns.collect {case x if x!= "values" => col(x)}: _* 
))).first 

そうしないと、(のみスカラ座)Datasetの上に減らすことができますが、それは追加のデシリアライズを必要とします。一般的には

type T = ??? 

df.reduce((a, b) => if (a.getAs[T]("values") > b.getAs[T]("values")) a else b) 
+0

少し複雑ですが、この 'struct'メソッドについて読む必要があります –

+0

' Orderable'スキーマの説明/定義にリンクしていますか? Googleの検索では、この非常に答えが見つかりました:) – max

+0

https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering。スカラ#L89-L96 – zero323

2

不完全な答えかもしれませんが、DataFrameの内部RDDを使用してmaxメソッドを適用し、決定されたキーを使用して最大レコードを取得できます。

a = sc.parallelize([ 
    ("a", 1, 100), 
    ("b", 2, 120), 
    ("c", 10, 1000), 
    ("d", 14, 1000) 
    ]).toDF(["name", "id", "salary"]) 

a.rdd.max(key=lambda x: x["salary"]) # Row(name=u'c', id=10, salary=1000) 
+1

RDD APIで1回(Scalaのオーバーヘッドを避けるためのスカラ)を保証することは、DataFrame APIで2回以上の高速化が保証されますか?あるいは、Catalystがここでできる最適化がありますか? – max

関連する問題