2016-10-26 6 views
2

私はPython APIでSpark 2.0を使用しています。Sparkで最後の月曜日を取得

私はDateType()型の列を持つデータフレームを持っています。直近の月曜日を含むデータフレームに列を追加したいと思います。

私はこのようにそれを行うことができます。

reg_schema = pyspark.sql.types.StructType([ 
    pyspark.sql.types.StructField('AccountCreationDate', pyspark.sql.types.DateType(), True), 
    pyspark.sql.types.StructField('UserId', pyspark.sql.types.LongType(), True) 
]) 
reg = spark.read.schema(reg_schema).option('header', True).csv(path_to_file) 
reg = reg.withColumn('monday', 
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Mon', 
     reg.AccountCreationDate).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Tue', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 1)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Wed', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 2)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Thu', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 3)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Fri', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 4)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sat', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 5)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sun', 
     pyspark.sql.functions.date_sub(reg.AccountCreationDate, 6)) 
     ))))))) 

しかし、これはかなり簡単であるべき何かのために多くのコードのように思えます。これを行うためのより簡潔な方法がありますか?

答えて

0

next_dayを使用して次の日付を決定し、週を減算することができます。必要な機能は、次のようにインポートすることができます。

from pyspark.sql.functions import next_day, date_sub 

をおよびAS:最後に

def previous_day(date, dayOfWeek): 
    return date_sub(next_day(date, "monday"), 7) 

例:結果と

from pyspark.sql.functions import to_date 

df = sc.parallelize([ 
    ("2016-10-26",) 
]).toDF(["date"]).withColumn("date", to_date("date")) 

df.withColumn("last_monday", previous_day("date", "monday")) 

+----------+-----------+ 
|  date|last_monday| 
+----------+-----------+ 
|2016-10-26| 2016-10-24| 
+----------+-----------+ 
関連する問題