2016-12-09 15 views
0

私はuserid(ユーザID)、day(日)の列を持つDataFrame(df)を持っています。pySpark、集計複合関数(連続するイベントの差)

私は毎日の平均時間間隔を計算することに興味があります。例えば

は、特定のユーザーのためのデータフレームは、データフレームがパンダデータフレームであれば、私はこの

import numpy as np 
np.mean(np.diff(df[df.userid==1].day)) 
などに興味量を計算することができ、この

userid  day  
1   2016-09-18   
1   2016-09-20 
1   2016-09-25  

のようなものに見えるかもしれません

しかし、これはDataFrameに何百万人ものユーザーがいるので非常に非効率的ですが、私はこれがこのようにできると信じています。

df.groupby("userid").agg({"day": lambda x: np.mean(np.diff(x))}) 

最初の問題は、np.mean(np.diff(x))を適用する前に日付をソートする必要があるため、これが正常に機能しているかわからないということです。

2番目の質問は、これは非効率です。これは、DataFrameをPandas DataFrameに変換するときにのみ行うことができるためです。

pySparkで全く同じことをする方法はありますか?

答えて

1

ウィンドウ機能が救済されています。いくつかの輸入:

from pyspark.sql.functions import col, datediff, lag 
from pyspark.sql.window import Window 

ウィンドウ定義

w = Window().partitionBy("userid").orderBy("day") 

とクエリ

(df 
    .withColumn("diff", datediff(lag("day", 1).over(w), "day")) 
    .groupBy("userid") 
    .mean("diff")) 
関連する問題