2017-08-06 10 views
2

私はこのようになり、データを持っている:Pyspark - タイムスタンプ値にlagとrangeBetween関数を使用するにはどうすればよいですか?

userid,eventtime,location_point 
4e191908,2017-06-04 03:00:00,18685891 
4e191908,2017-06-04 03:04:00,18685891 
3136afcb,2017-06-04 03:03:00,18382821 
661212dd,2017-06-04 03:06:00,80831484 
40e8a7c3,2017-06-04 03:12:00,18825769 

私は同じlocation_pointで5分ウィンドウ内の2以上useridがある場合にtrueをマークし、新たなブール列を追加したいと思います。私はuseridで仕切られた窓をオーバールックアップするlag機能を使用してのアイデアを持っていたし、現在のタイムスタンプと次の5分の間の範囲で:私が使用している場合

from pyspark.sql import functions as F 
from pyspark.sql import Window as W 
from pyspark.sql.functions import col 

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5)) 

lastURN = F.lag(col("location_point"), 1).over(windowSpec) 
visitCheck = (last_location_point == output.location_pont) 
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check") 

このコードは、私に解析例外を与えていますRangeBetween機能:

AnalysisException:CURRENT ROW 1500 BETWEEN u'Windowフレーム範囲以下は、PRECEDING を前後1 1との間に必要なフレームROWSと一致する必要があります。

この問題に対処する方法はありますか?

答えて

1

を考えると、あなたのデータ: のは、秒単位でタイムスタンプを持つ列を追加してみましょう:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long")) 
df.show() 

+--------+-------------------+--------------+----------+ 
| userid|   eventtime|location_point| timestamp| 
+--------+-------------------+--------------+----------+ 
|4e191908|2017-06-04 03:00:00|  18685891|1496545200| 
|4e191908|2017-06-04 03:04:00|  18685891|1496545440| 
|3136afcb|2017-06-04 03:03:00|  18382821|1496545380| 
|661212dd|2017-06-04 03:06:00|  80831484|1496545560| 
|40e8a7c3|2017-06-04 03:12:00|  18825769|1496545920| 
|4e191908|2017-06-04 03:11:30|  18685891|1496545890| 
+--------+-------------------+--------------+----------+ 

を今、聞かせてのをあなたはlocationでパーティション場合は、ウィンドウ関数と同様のものを試すことができます

location_pointによるパーティションと、タイムスタンプによる順序と、-300秒と現在の時間との間の範囲を有するウィンドウ関数を定義する。私たちは、このウィンドウ内の要素の数をカウントし、名前の欄に、これらのデータを入れて「in_5_minを発生箇所」することができます

w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0) 
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w)) 
df.show() 

+--------+-------------------+--------------+----------+--------------------+ 
| userid|   eventtime|location_point| timestamp|occurrences_in_5_min| 
+--------+-------------------+--------------+----------+--------------------+ 
|40e8a7c3|2017-06-04 03:12:00|  18825769|1496545920|     1| 
|3136afcb|2017-06-04 03:03:00|  18382821|1496545380|     1| 
|661212dd|2017-06-04 03:06:00|  80831484|1496545560|     1| 
|4e191908|2017-06-04 03:00:00|  18685891|1496545200|     1| 
|4e191908|2017-06-04 03:04:00|  18685891|1496545440|     2| 
|4e191908|2017-06-04 03:11:30|  18685891|1496545890|     1| 
+--------+-------------------+--------------+----------+--------------------+ 

はあなたが真で目的の列を追加することができ出現箇所の数が厳密に1以上である場合特定の場所で最後の5分間:

add_bool = udf(lambda col : True if col>1 else False, BooleanType()) 
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min')) 
df.show() 

+--------+-------------------+--------------+----------+--------------------+---------------+ 
| userid|   eventtime|location_point| timestamp|occurrences_in_5_min|already_occured| 
+--------+-------------------+--------------+----------+--------------------+---------------+ 
|40e8a7c3|2017-06-04 03:12:00|  18825769|1496545920|     1|   false| 
|3136afcb|2017-06-04 03:03:00|  18382821|1496545380|     1|   false| 
|661212dd|2017-06-04 03:06:00|  80831484|1496545560|     1|   false| 
|4e191908|2017-06-04 03:00:00|  18685891|1496545200|     1|   false| 
|4e191908|2017-06-04 03:04:00|  18685891|1496545440|     2|   true| 
|4e191908|2017-06-04 03:11:30|  18685891|1496545890|     1|   false| 
+--------+-------------------+--------------+----------+--------------------+---------------+ 
1

rangeBetweenlagのような非集計機能では意味がありません。 lagはoffset引数で指定された特定の行を常に取るので、frameを指定することは無意味です。

あなたは標準の集合体でグループ化windowを使用することができ、時系列の上にウィンドウを取得するには、次の

from pyspark.sql.functions import window, countDistinct 


(df 
    .groupBy("location_point", window("eventtime", "5 minutes")) 
    .agg(countDistinct("userid"))) 

あなたはスライド継続時間を変更するための複数の引数を追加することができます。

windowSpec = (W.partitionBy(col("location")) 
    .orderBy(col("eventtime").cast("timestamp").cast("long")) 
    .rangeBetween(0, days(5))) 


df.withColumn("id_count", countDistinct("userid").over(windowSpec)) 
関連する問題