2

私はpysparkでSparkSQLを使用していくつかのPostgreSQLテーブルをDataFramesに格納してから、dateというstartstopカラムに基づいていくつかの時系列を生成するクエリを作成します。parkparkのSparkSQL:時系列を生成するには?

my_tableが含まれているとします。PostgreSQLの

start  | stop  
------------------------- 
2000-01-01 | 2000-01-05 
2012-03-20 | 2012-03-23 

を、それはそれを行うことは非常に簡単です:

SELECT generate_series(start, stop, '1 day'::interval)::date AS dt FROM my_table 

、それはこのテーブルを生成します。

dt 
------------ 
2000-01-01 
2000-01-02 
2000-01-03 
2000-01-04 
2000-01-05 
2012-03-20 
2012-03-21 
2012-03-22 
2012-03-23 

が、どのように行うにそれは普通のSparkSQLを使っているのですか? UDFやいくつかのDataFrameメソッドを使用する必要がありますか?

答えて

0

この

from pyspark.sql.functions as F 
from pyspark.sql.types as T 

def timeseriesDF(start, total): 
    series = [start] 
    for i xrange(total-1): 
     series.append(
      F.date_add(series[-1], 1) 
     ) 
    return series 

df.withColumn("t_series", F.udf(
       timeseriesDF, 
       T.ArrayType() 
      ) (df.start, F.datediff(df.start, df.stop)) 
    ).select(F.explode("t_series")).show() 
+0

は、ラケッシュ、ありがとうございました。あなたのアイデアに合わせてあまり控えめな解決策を思いつきました。私はそれができるだけ少ないPythonコードでSparkSQLの構文に欲しいと思っていました。私はあなたの答えを受け入れるだろうが、私の解決策を見てください。 – pietrop

1

@Rakesh答えが正しいですが、私はあまり詳細な解決策を共有したいと考えてみ、あなたがスパークSQLからデータフレームdfがあるとします。

import datetime 
import pyspark.sql.types 
from pyspark.sql.functions import UserDefinedFunction 

# UDF 
def generate_date_series(start, stop): 
    return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]  

# Register UDF for later usage 
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType())) 

# mydf is a DataFrame with columns `start` and `stop` of type DateType() 
mydf.createOrReplaceTempView("mydf") 

spark.sql("SELECT explode(generate_date_series(start, stop)) FROM mydf").show() 
関連する問題