2

私は、(PySparkを使って)Sparkデータフレーム上のグループ分位数を計算したいと思います。おおよその正確な結果が良いでしょう。私はgroupBy/aggのコンテキスト内で使用できるソリューションを好むので、他のPySpark集約関数と混在させることができます。何らかの理由でこれが不可能な場合は、別の方法でも問題ありません。PySparkグループ内のメジアン/クオンタムByB

This questionは、関連性がありますが、approxQuantileを集計関数として使用する方法を示していません。

また、私はpercentile_approxハイブUDFにアクセスできますが、集計関数としてどのように使用するのか分かりません。特異性のために

、私は、次のデータフレームがあるとは:

from pyspark import SparkContext 
import pyspark.sql.functions as f 

sc = SparkContext()  

df = sc.parallelize([ 
    ['A', 1], 
    ['A', 2], 
    ['A', 3], 
    ['B', 4], 
    ['B', 5], 
    ['B', 6], 
]).toDF(('grp', 'val')) 

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val')) 
df_grp.show() 

期待される結果は次のとおりです。

+----+-------+ 
| grp|med_val| 
+----+-------+ 
| A|  2| 
| B|  5| 
+----+-------+ 
+0

いくつかのサンプルデータとともに、正確に何を達成しようとしているのかの明確な例を提供してください - なぜあなたのケースにリンクされた答えが当てはまりませんか? – desertnaut

+0

短い答えは、質問も回答も「グループ」または「集約」という単語しかし、あなたが提案したように質問を更新します。 – abeboparebop

+0

私はあなたが基礎的なrddと分散分数を計算するためのアルゴリズムを使用して、この例で自分自身をロールバックできると思います。 [ここ](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest)とその中のリンク。実際にリンクしているgithubには、いくつかのpysparkの例があります。 – ags29

答えて

4

あなたはpercentile_approxへのアクセス権を持っているので、1つの簡単な解決策は、になりますSQLコマンドで使用してください:

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df.registerTempTable("df") 
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp") 
+0

これはうまくいきますが、私はPySparkレベルで 'groupBy' /' agg'の中で使うことができます(他のPySpark集合関数と簡単に混合できるように)。 – abeboparebop

+0

@abeboparebop私は 'groupBy'と' agg'だけを使うことはできませんが、ウィンドウベースのアプローチを使うこともできます。 – Shaido

+1

私は、理想的な解決方法を明確にしました。明らかにこの回答は仕事ですが、それは私が欲しいものではありません。私は質問をしばらく開いたままにして、よりクリーンな回答が出てくるかどうかを見ていきます。 – abeboparebop

4

残念なことに、私の知る限り、「純粋な」PySparkコマンド(Shaidoの解決策はSQLの回避策を提供しています)でこれを行うことはできないようです。理由は非常に基本的です。 mean,approxQuantileなどの他の集計関数との対比は、Columnタイプを返しませんが、リストを返します。

はのは、あなたのサンプルデータを簡単な例を見てみましょう:

spark.version 
# u'2.2.0' 

import pyspark.sql.functions as func 
from pyspark.sql import DataFrameStatFunctions as statFunc 

# aggregate with mean works OK: 
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val')) 
df_grp_mean.show() 
# +---+--------+ 
# |grp|mean_val| 
# +---+--------+ 
# | B|  5.0| 
# | A|  2.0| 
# +---+--------+ 

# try aggregating by median: 
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# AssertionError: all exprs should be Column 

# mean aggregation is a Column, but median is a list: 

type(func.mean(df['val'])) 
# pyspark.sql.column.Column 

type(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# list 

私は、ウィンドウベースのアプローチは、私は根本的な理由は非常に基本的なものです言ったようにするので、任意の違いを生むだろうことを疑います。

詳細はmy answer hereも参照してください。