2017-08-30 2 views
1

Spark Dataframeで移動するパーセンタイルをきれいに計算する方法はありますか?Spark Dataframeでのパーセンタイルの移動

私は巨大なデータフレームを持っています。私は15分ごとにそれを集計しています。私はパーセンテイルを各部分で計算したいと思います。

df.groupBy(window(col("date").cast("timestamp"), "15 minutes")) 
    .agg(sum("session"),mean("session"),percentile_approx("session", 0.5)) 
    .show() 

エラー:見つかりません:値percentile_approx

は、だから私は、合計、平均などの基本的な事柄を計算する必要があるが、私は、中央値およびいくつかの他のパーセンタイルを計算する必要があります。

Spark 2.1で効率的な方法がありますか?

ここでは、何の中央値percentile_approx、それはそうAPIで実装Percentile_approx機能はありませんので。

私はこの質問を既に聞いてきましたが、答えがすべてユニークな解決策に同意しているとは限りませんでした。そして私にとってはあまりにもぼんやりしていました...だから、2017年8月に良いと効率的な解決策があるかどうかを知りたかったのです。

15分のウィンドウを通過するとき、ちょうど近似ではなくハードコンピューティングでは動作しないのだろうか?あなたの注意のための

どうもありがとう、

は良い午後を持っています!

PS:ScalaまたはPySpark私は気にしません、両方ともさらに大きくなるでしょう!

+0

サンプルコードで 'window'とは何ですか?ウィンドウ機能(したがってスライディングウィンドウ)または重複しないウィンドウ(groupBy)が必要ですか? –

+0

あなたの答えをお寄せいただきありがとうございました。履歴データがあり、1分ごとに集計したい1分ごとに何百ものレコードがあり、スライディングウィンドウごとに(毎分ごとに)中央値などを計算する必要があります。だから私は効率的に行うには何がきれいな方法であるのだろうと思っていました – tricky

+0

ウィンドウは実際には「スライディング」していません。スライディングウィンドウの場合、ウィンドウ関数が必要になるからです。AFAIKスライドはあなたの場合を意味します:すべてのレコードについて、「周囲」のデータを15分とし、集計を計算します –

答えて

1

[OK]をので、私は私が推測かなり間抜けだった:あなたのいずれかがあなた自身のUDAFを実装するか、次の方法を使用する必要があるので、私の知る限り何パーセンタイル集計関数は、ありません。

私は以前のアイデアにcallUDFを追加するだけでした:%%%%、私の場合は例えばだから、意見の相違

callUDF("percentile_approx", col("session"), lit(0.5)) 

のため申し訳ありませんが、私は毎分2ヶ月の歴史的なデータセットを集約したい:

df.groupBy(window((col("date")/1000).cast("timestamp"), "1 minutes")) 
.agg(sum("session"),mean("session"),callUDF("percentile_approx", col("session"), lit(0.5))) 
.show() 

(これmilisecondのタイムスタンプ/1000

+0

それはすばらしい、私は集計関数として 'パーセンタイル 'を使うことができるのか分からなかった! –

+0

ちょうど明確にするために:percentile_approx(あなたはちょうどパーセンタイルを使用することもできます)は、組み込みのHIVE UDAFです(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) ))、それはsparkによって実装されるのではなく、ハイブによって実装されます(ハイブサポート(またはhiveContext)がある場合にのみ使用できます)。 –

1

スライディング(重複)する必要がない場合は、groupByを使用して実行できます。

val df = (1 to 100).map(i => (
    i/10, scala.util.Random.nextDouble) 
).toDF("time","session") 

val calcStats = udf((data:Seq[Double]) => { 
    (data.sum, 
    data.sum/data.size, 
    data.sorted.apply(data.size/2) // is ~ median, replace with your desired logic 
) 
}) 

df.groupBy($"time") 
    .agg(collect_list($"session").as("sessions")) 
    .withColumn("stats",calcStats($"sessions").cast("struct<sum:double,mean:double,median:double>")) 
    .select($"time",$"stats.*") 
    .orderBy($"time") 
    .show 

+----+------------------+-------------------+-------------------+ 
|time|    sum|    mean|    median| 
+----+------------------+-------------------+-------------------+ 
| 0|3.5441618790222287| 0.3937957643358032| 0.3968893251191352| 
| 1|3.6612518806543757| 0.3661251880654376| 0.4395039388994335| 
| 2| 4.040992655970037|0.40409926559700365| 0.3522214051715915| 
| 3| 4.583175830988081| 0.4583175830988081| 0.5800394949546751| 
| 4| 3.849409207658501| 0.3849409207658501|0.43422232330495936| 
| 5| 5.514681139649785| 0.5514681139649784| 0.6703416471647694| 
| 6| 4.890227540935781| 0.4890227540935781| 0.5515164635420178| 
| 7|4.1148083531280095|0.41148083531280094| 0.4384132796986667| 
| 8| 5.723834881155167| 0.5723834881155166| 0.6415902834329499| 
| 9| 5.559212938582014| 0.5559212938582014| 0.6816268800227596| 
| 10|0.8867335786067405| 0.8867335786067405| 0.8867335786067405| 
+----+------------------+-------------------+-------------------+ 
+0

私の悪いと清算のおかげで!あなたの答えは、UDFの理解を深めるのに役立ちました。 – tricky