2017-09-29 19 views
1

私は 'user_name'、 'mac'、 'dayte'(day)のデータセットを持っています。 GROUP BY ['user_name']としたいと思います。そのGROUP BYのために、 'dayte'を使用して30日間回転するWINDOWを作成します。そのローリング30日間で、私は「mac」の別個の数を数えたいと思う。それを私のデータフレームに追加してください。データのサンプル。Pythonローリング30日間GROUP BY By Count Distinct String

user_name mac dayte 
0 001j 7C:D1 2017-09-15 
1 0039711 40:33 2017-07-25 
2 0459 F0:79 2017-08-01 
3 0459 F0:79 2017-08-06 
4 0459 F0:79 2017-08-31 
5 0459 78:D7 2017-09-08 
6 0459 E0:C7 2017-09-16 
7 133833 18:5E 2017-07-27 
8 133833 F4:0F 2017-07-31 
9 133833 A4:E4 2017-08-07 

私はこれをPANDAsデータフレームで解決しようとしました。

df['ct_macs'] = df.groupby(['user_name']).rolling('30d', on='dayte').mac.apply(lambda x:len(x.unique())) 

しかし

Exception: cannot handle a non-unique multi-index! 

エラーを受け取った私はPySparkで試してみましたが、同様にエラーを受け取りました。

from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

#convert string timestamp to timestamp type    
df= df.withColumn('dayte', df.dayte.cast('timestamp')) 
#create window by casting timestamp to long (number of seconds) 
w = Window.partitionBy("user_name").orderBy("dayte").rangeBetween(-days(30), 0) 

df= df.select("user_name","mac","dayte",F.size(F.denseRank().over(w).alias("ct_mac"))) 

しかし、エラーに

Py4JJavaError: An error occurred while calling o464.select. 
: org.apache.spark.sql.AnalysisException: Window function dense_rank does not take a frame specification.; 

を受けた私はまた

df= df.select("user_name","dayte",F.countDistinct(col("mac")).over(w).alias("ct_mac")) 

をしようとしたが、それは(ウィンドウ内の個別カウント)スパークでは、明らかに、サポートされていません。 私は純粋なSQLのアプローチにオープンしています。どちらかのMySQLまたはSQL ServerでPythonまたはSparkを好むでしょう。

答えて

0

Pyspark

ウィンドウ関数は次のように制限されている:

  • フレーム行のみによって定義することができず、列は
  • countDistinct
  • 列挙存在しない値機能はフレームで使用できません

代わりに自分のテーブルに参加することができます。

まず者は、データフレームを作成してみましょう:

joingroupByのための今
df = sc.parallelize([["001j", "7C:D1", "2017-09-15"], ["0039711", "40:33", "2017-07-25"], ["0459", "F0:79", "2017-08-01"], 
        ["0459", "F0:79", "2017-08-06"], ["0459", "F0:79", "2017-08-31"], ["0459", "78:D7", "2017-09-08"], 
        ["0459", "E0:C7", "2017-09-16"], ["133833", "18:5E", "2017-07-27"], ["133833", "F4:0F", "2017-07-31"], 
        ["133833", "A4:E4", "2017-08-07"]]).toDF(["user_name", "mac", "dayte"]) 

import pyspark.sql.functions as psf 
df.alias("left")\ 
    .join(
     df.alias("right"), 
     (psf.col("left.user_name") == psf.col("right.user_name")) 
     & (psf.col("right.dayte").between(psf.date_add("left.dayte", -30), psf.col("left.dayte"))), 
     "leftouter")\ 
    .groupBy(["left." + c for c in df.columns])\ 
    .agg(psf.countDistinct("right.mac").alias("ct_macs"))\ 
    .sort("user_name", "dayte").show() 

    +---------+-----+----------+-------+ 
    |user_name| mac|  dayte|ct_macs| 
    +---------+-----+----------+-------+ 
    |  001j|7C:D1|2017-09-15|  1| 
    | 0039711|40:33|2017-07-25|  1| 
    |  0459|F0:79|2017-08-01|  1| 
    |  0459|F0:79|2017-08-06|  1| 
    |  0459|F0:79|2017-08-31|  1| 
    |  0459|78:D7|2017-09-08|  2| 
    |  0459|E0:C7|2017-09-16|  3| 
    | 133833|18:5E|2017-07-27|  1| 
    | 133833|F4:0F|2017-07-31|  2| 
    | 133833|A4:E4|2017-08-07|  3| 
    +---------+-----+----------+-------+ 

パンダ

これはのpython3

のために働きます210
import pandas as pd 
import numpy as np 
df["mac"] = pd.factorize(df["mac"])[0] 
df.groupby('user_name').rolling('30D', on="dayte").mac.apply(lambda x: len(np.unique(x)))