2
PySparkのデータフレームで複数の列を使用して中規模の複雑な日付の計算をしようとしています。基本的には、created_at
のタイムスタンプをフィルタリングした後の週数を表すnumber
という列があります。 PostgreSQLではinterval based on the value in a columnを掛けることができますが、SQL APIまたはPython APIを使用してPySparkでこれを行う方法を理解できないようです。ここのお手伝いが大変ありがとう!PySparkの複数の列を使った日付の算術
import datetime
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
start_date = datetime.date(2020,1,1)
my_df = sc.parallelize([
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3, metric=10),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1, metric=20),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2, metric=20),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30),
]).toDF()
# This doesn't work!
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'")
# Neither does this!
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date()
は、操作を行って、datetime
オブジェクトに文字列を変換するためにpythonでdatetime
ライブラリを使用して、日付を文字列に変換する必要になる可能solution hereありますが、それは狂ったようです。