2015-10-19 9 views
9

Spark SQL DataFrameにデータがあり、取得しようとしているのは指定された日付範囲の現在の行より前のすべての行です。たとえば、私は7日前から与えられた行の前にすべての行を持っていると思います。ここスパークウィンドウ関数 - rangeBetween dates

Window \ 
    .partitionBy('id') \ 
    .orderBy('start') 

と問題が来る:私はWindow Functionのように使用する必要があります考え出しました。私はrangeBetweenを7日間持っていたいが、Sparkの文書には何も見つかりませんでした。スパークはそのようなオプションを提供しますか?今の私はちょうどで先行するすべての行を取得しています:

.rowsBetween(-sys.maxsize, 0) 

が、何かを達成したいと思います。誰もが、私は非常に感謝されるだろう、この1に私を助けることができれば

.rangeBetween("7 days", 0) 

を。前もって感謝します!

答えて

21

私が知る限り、スパークとハイブのどちらでも直接には可能ではありません。どちらもORDER BY句をRANGEと共に使用する必要があります。私が見つけた最も近いものはタイムスタンプへの変換と秒単位での操作です。 prettyから極東

w = (Window() 
    .partitionBy(col("id")) 
    .orderBy(col("start").cast("timestamp").cast("long")) 
    .rangeBetween(-days(7), 0)) 

df.select(col("*"), mean("some_value").over(w).alias("mean")).show() 

## +---+----------+----------+------------------+ 
## | id|  start|some_value|    mean| 
## +---+----------+----------+------------------+ 
## | 1|2015-01-01|  20.0|    20.0| 
## | 1|2015-01-06|  10.0|    15.0| 
## | 1|2015-01-07|  25.0|18.333333333333332| 
## | 1|2015-01-12|  30.0|21.666666666666668| 
## | 2|2015-01-01|  5.0|    5.0| 
## | 2|2015-01-03|  30.0|    17.5| 
## | 2|2015-02-01|  20.0|    20.0| 
## +---+----------+----------+------------------+ 

が、動作します:

from pyspark.sql import Row 

row = Row("id", "start", "some_value") 
df = sc.parallelize([ 
    row(1, "2015-01-01", 20.0), 
    row(1, "2015-01-06", 10.0), 
    row(1, "2015-01-07", 25.0), 
    row(1, "2015-01-12", 30.0), 
    row(2, "2015-01-01", 5.0), 
    row(2, "2015-01-03", 30.0), 
    row(2, "2015-02-01", 20.0) 
]).toDF().withColumn("start", col("start").cast("date")) 

小さなヘルパーとウィンドウ定義:

from pyspark.sql.window import Window 
from pyspark.sql.functions import mean, col 


# Hive timestamp is interpreted as UNIX timestamp in seconds* 
days = lambda i: i * 86400 

最後にクエリstart列を仮定するとdateタイプが含まれています。


* Hive Language Manual, Types

+0

おかげで、私はそれが確認されていてもいい似たような、考えていました! – Nhor

関連する問題