私は 'user_name'、 'mac'、 'dayte'(day)のデータセットを持っています。 GROUP BY ['user_name']としたいと思います。そのGROUP BYのために、 'dayte'を使用して30日間回転するWINDOWを作成します。そのローリング30日間で、私は「mac」の別個の数を数えたいと思う。それを私のデータフレームに追加してください。データのサンプル。Pythonローリング30日間GROUP BY By Count Distinct String
user_name mac dayte
0 001j 7C:D1 2017-09-15
1 0039711 40:33 2017-07-25
2 0459 F0:79 2017-08-01
3 0459 F0:79 2017-08-06
4 0459 F0:79 2017-08-31
5 0459 78:D7 2017-09-08
6 0459 E0:C7 2017-09-16
7 133833 18:5E 2017-07-27
8 133833 F4:0F 2017-07-31
9 133833 A4:E4 2017-08-07
私はこれをPANDAsデータフレームで解決しようとしました。
df['ct_macs'] = df.groupby(['user_name']).rolling('30d', on='dayte').mac.apply(lambda x:len(x.unique()))
しかし
Exception: cannot handle a non-unique multi-index!
エラーを受け取った私はPySparkで試してみましたが、同様にエラーを受け取りました。
from pyspark.sql import functions as F
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
#convert string timestamp to timestamp type
df= df.withColumn('dayte', df.dayte.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = Window.partitionBy("user_name").orderBy("dayte").rangeBetween(-days(30), 0)
df= df.select("user_name","mac","dayte",F.size(F.denseRank().over(w).alias("ct_mac")))
しかし、エラーに
Py4JJavaError: An error occurred while calling o464.select.
: org.apache.spark.sql.AnalysisException: Window function dense_rank does not take a frame specification.;
を受けた私はまた
df= df.select("user_name","dayte",F.countDistinct(col("mac")).over(w).alias("ct_mac"))
をしようとしたが、それは(ウィンドウ内の個別カウント)スパークでは、明らかに、サポートされていません。 私は純粋なSQLのアプローチにオープンしています。どちらかのMySQLまたはSQL ServerでPythonまたはSparkを好むでしょう。