4

特定の列でグループ化したいデータがあります。グループのローリング時間ウィンドウに基づいて一連のフィールドを集計します。ここでSparkでグループとローリング時間ウィンドウを集計する方法

は、いくつかの例のデータである:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1), 
          Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2), 
          Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3), 
          Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3), 
          Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3), 
          Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)]) 

私は、最も早い日付で開始し、時間ウィンドウを作成し、そのグループのエントリがないと、30日があるまで延長、group_byによってグループ化します。 30日が過ぎると、次のウィンドウは、前のウィンドウにない次の行の日付から始まります。

次に、たとえば平均値がget_avgになり、最初の結果がget_firstになるように集計したいとします。

だから、この例の出力は次のようになります。

group_by first date of window get_avg get_first 
group1  2016-01-01    5  1 
group2  2016-02-01    20  3 
group2  2016-04-02    8  4 

編集:申し訳ありませんが、私は私の質問が正しく指定されていません実現。私は実際には30日間の休止後に終了するウィンドウが欲しい。私はそれに応じて例のgroup2部分を修正しました。

答えて

9

改訂答え

あなたは、単純なウィンドウ関数は、ここにトリックを使用することができます。輸入品の束:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_ 
from pyspark.sql.window import Window 

ウィンドウ定義:

w = Window.partitionBy("group_by").orderBy("date") 

キャストdateDateTypeへ:

df_ = df.withColumn("date", col("date").cast("date")) 

次式の定義:

# Difference from the previous record or 0 if this is the first one 
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0)) 

# 0 if diff <= 30, 1 otherwise 
indicator = (diff > 30).cast("integer") 

# Cumulative sum of indicators over the window 
subgroup = sum_(indicator).over(w).alias("subgroup") 

はを追加テーブルへの式:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg") 
+--------+--------+------------+ 
|group_by|subgroup|avg(get_avg)| 
+--------+--------+------------+ 
| group1|  0|   5.0| 
| group2|  0|  20.0| 
| group2|  1|   8.0| 
+--------+--------+------------+ 

firstは集計と意味がありませんが、列が単調に増加している場合は、minを使用することができます。それ以外の場合は、ウィンドウ関数も使用する必要があります。

Spark 2.1を使用してテストされました。以前のSparkリリースで使用する場合、サブクエリとWindowインスタンスが必要な場合があります。スパーク2.0以降

元答え(指定した範囲では関係ありません)

あなたはa window function使用することができる必要があります:1以上の時間に

Bucketize行を列にタイムスタンプを指定したウィンドウ。ウィンドウの開始はインクルーシブですが、ウィンドウの終了は排他的です。 12:05は[12:05,12:10]のウィンドウに表示されますが、[12:00,12:05]には表示されません。それはタイムゾーンに来るとき

from pyspark.sql.functions import window 

df.groupBy(window("date", windowDuration="30 days")).count() 

いますが、結果から見ることができ、

+---------------------------------------------+-----+ 
|window          |count| 
+---------------------------------------------+-----+ 
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 | 
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 | 
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 | 
+---------------------------------------------+-----+ 

あなたは少し注意する必要があります。

関連する問題