2017-11-14 10 views
0

私はスパークSQLウィンドウ機能を試すために、次のコードを持っている:スパークSQLウィンドウ関数の結果を理解する方法

test("spark sql time window 2") { 
    val spark = SparkSession.builder().master("local").appName("SparkSQLWindowTest").getOrCreate() 
    import spark.implicits._ 
    import org.apache.spark.sql.functions._ 
    val ds = Seq(
     SaleRecord("2017-10-11 09:01:12", 1), 
     SaleRecord("2017-10-11 09:01:18", 6), 
     SaleRecord("2017-10-11 10:11:12", 2), 
     SaleRecord("2017-10-11 10:18:13", 5), 
     SaleRecord("2017-10-11 10:22:13", 3), 
     SaleRecord("2017-10-11 10:22:22", 6), 
     SaleRecord("2017-10-11 10:34:56", 2), 
     SaleRecord("2017-10-11 10:48:22", 6), 
     SaleRecord("2017-10-11 11:52:23", 4), 
     SaleRecord("2017-10-11 12:56:24", 2)).toDS 

    val ds2 = ds.groupBy(window($"Time", "20 minutes", "9 minutes")).agg(sum("revenue")).orderBy("window.start") 
    ds2.show(truncate = false) 

    /* 
+---------------------------------------------+------------+ 
|window          |sum(revenue)| 
+---------------------------------------------+------------+ 
|[2017-10-11 08:45:00.0,2017-10-11 09:05:00.0]|7.0   | 
|[2017-10-11 08:54:00.0,2017-10-11 09:14:00.0]|7.0   | 
|[2017-10-11 09:57:00.0,2017-10-11 10:17:00.0]|2.0   | 
|[2017-10-11 10:06:00.0,2017-10-11 10:26:00.0]|16.0  | 
|[2017-10-11 10:15:00.0,2017-10-11 10:35:00.0]|16.0  | 
|[2017-10-11 10:24:00.0,2017-10-11 10:44:00.0]|2.0   | 
|[2017-10-11 10:33:00.0,2017-10-11 10:53:00.0]|8.0   | 
|[2017-10-11 10:42:00.0,2017-10-11 11:02:00.0]|6.0   | 
|[2017-10-11 11:36:00.0,2017-10-11 11:56:00.0]|4.0   | 
|[2017-10-11 11:45:00.0,2017-10-11 12:05:00.0]|4.0   | 
|[2017-10-11 12:39:00.0,2017-10-11 12:59:00.0]|2.0   | 
|[2017-10-11 12:48:00.0,2017-10-11 13:08:00.0]|2.0   | 
+---------------------------------------------+------------+ 


    */ 
    } 

SaleRecordの定義は、単純なケースクラスです:

case class SaleRecord(time: String, revenue: Double) 

結果に最初の3つの行がどのように生成されるのか理解できませんでしたか?最初のウィンドウは、Windowsの機能で [2017-10-11 08:45:00.0,2017-10-11 09:05:00.0]

答えて

1
window(timeColumn, windowDuration, slideDuration=None, startTime=None) 

まず明確にするために

、平均時間テンプレット作成するwindow機能:例えば

zero = 1970-01-01 00:00:00 UTC 

[zero + startTime + slideDuration * n, zero + startTime + slideDuration * n + windowDuration) 

:そして

window('ts', '5 seconds', '3 seconds', '2 seconds') 
# is equal to : 
['1970-01-01 00:00:02', '1970-01-01 00:00:07'), 
['1970-01-01 00:00:05', '1970-01-01 00:00:10'), 
['1970-01-01 00:00:08', '1970-01-01 00:00:13'), 
['1970-01-01 00:00:11', '1970-01-01 00:00:16'), 
... 

を、各行にきみの DataFrameは、timeColumnに従って、時間テンプレットに「入る」。 1つの行は、多くのタイムテンプレットセルに属します。

最後に、すべての空のタイムテンプットセルを削除し、aggを実行します。

+0

おかげで@ zhang - トン、優秀な説明は、今すぐ 'startTime'を理解する – Tom

0

をなぜ

は、スパークは、UNIXエポックタイム1970-01-01 00:00:00 UTCからフレームをcalcualteます。 sildeの継続時間を9 minutesに設定しているため、最初のフレームにはTime列の値が含まれています。[2017-10-11 08:45:00.0,2017-10-11 09:05:00.0]です。

$ date --date="2017-10-11 08:45:00" +"%s 
1507686300 
$ echo $[1507686300%180] 
0 
+0

ありがとう@cue ...私は理解できませんでした。値[2017-10-11 08:45:00.0,2017-10-11 09:05:00.0] 'はクエリの結果.. – Tom

+0

あなたは 'Time'値を持つ2つのレコードがそのウィンドウに入っているからです。 – nabongs

関連する問題