2016-04-12 12 views
2

PySparkのデータフレームで複数の列を使用して中規模の複雑な日付の計算をしようとしています。基本的には、created_atのタイムスタンプをフィルタリングした後の週数を表すnumberという列があります。 PostgreSQLではinterval based on the value in a columnを掛けることができますが、SQL APIまたはPython APIを使用してPySparkでこれを行う方法を理解できないようです。ここのお手伝いが大変ありがとう!PySparkの複数の列を使った日付の算術

import datetime 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark import SparkContext 

sc = SparkContext() 
sqlContext = SQLContext(sc) 
start_date = datetime.date(2020,1,1) 

my_df = sc.parallelize([ 
     Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1, metric=10), 
     Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2, metric=10), 
     Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3, metric=10), 
     Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1, metric=20), 
     Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2, metric=20), 
     Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7, metric=30), 
     Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8, metric=30), 
     Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9, metric=30), 
     Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30), 
    ]).toDF() 


# This doesn't work! 
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'") 
# Neither does this! 
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date() 

は、操作を行って、datetimeオブジェクトに文字列を変換するためにpythonでdatetimeライブラリを使用して、日付を文字列に変換する必要になる可能solution hereありますが、それは狂ったようです。

答えて

4

申し訳ありませんが、exprと内蔵のdate_add機能を使用して前向きに考えました。

from pyspark.sql.functions import expr, date_add 
new_df = my_df.withColumn('test', expr('date_add(created_at, number*7)')) 
filtered = new_df.filter(new_df.test > start_date) 
filtered.show() 

は、他の誰かが上の追加したい場合は/なぜ、これは、しかし、一般的な方法でどのように機能するかをいくつかの洞察を大好きです!

関連する問題