2017-10-12 7 views
1

「日付」がタイプtimestampで「量」がlongのスパークデータフレームがあります。日付ごとに、私は数量のためのいくつかの値を持っています。日付は昇順にソートされます。しかし、いくつかの日付が欠落しています。 カレントDF - - などの場合 火花データフレーム列に欠けている日付を記入する

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
14-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
20-09-2016 | 2 

あなたが見ることができるように、DFは2016年12月9日のようないくつか欠落している日付を持って、13-09-2016など私はのために数量フィールドに0を入れたいです結果のdfが次のようになるような欠けている日付 -

Date  | Quantity 
10-09-2016 | 1 
11-09-2016 | 2 
12-09-2016 | 0 
13-09-2016 | 0 
14-09-2016 | 0 
15-09-2016 | 0 
16-09-2016 | 1 
17-09-2016 | 0 
18-09-2016 | 0 
19-09-2016 | 0 
20-09-2016 | 2 

これに関する助言/提案は認められます。前もって感謝します。 私はscalaでコーディングしていることに注意してください。

答えて

2

コードの理解を容易にするために、この回答を少し詳細に記述しました。最適化することができます。

必要輸入有効な日付形式Iterate over dates range

def fill_dates = udf((start: String, excludedDiff: Int) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") 
    val fromDt = LocalDateTime.parse(start, dtFormatter) 
    (1 to (excludedDiff - 1)).map(day => { 
     val dt = fromDt.plusDays(day) 
     "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 
    }) 
から採取されたUDFコード下

val date_transform = udf((date: String) => { 
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y") 
    val dt = LocalDate.parse(date, dtFormatter) 
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth) 
     .replaceAll(" ", "0") 
    }) 

文字列ため

import java.time.format.DateTimeFormatter 
import java.time.{LocalDate, LocalDateTime} 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{LongType, TimestampType} 

のUDF

設定サンプルデータフレーム(df

val df = Seq(
     ("10-09-2016", 1), 
     ("11-09-2016", 2), 
     ("14-09-2016", 0), 
     ("16-09-2016", 1), 
     ("17-09-2016", 0), 
     ("20-09-2016", 2)).toDF("date", "quantity") 
     .withColumn("date", date_transform($"date").cast(TimestampType)) 
     .withColumn("quantity", $"quantity".cast(LongType)) 

df.printSchema() 
root 
|-- date: timestamp (nullable = true) 
|-- quantity: long (nullable = false) 


df.show()  
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-14 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+ 

dfunionへの一時的なデータフレーム(tempDf)を作成します。

val w = Window.orderBy($"date") 
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date")) 
    .filter($"diff" > 1) // Pick date diff more than one day to generate our date 
    .withColumn("next_dates", fill_dates($"date", $"diff")) 
    .withColumn("quantity", lit("0")) 
    .withColumn("date", explode($"next_dates")) 
    .withColumn("date", $"date".cast(TimestampType)) 

tempDf.show(false) 
+-------------------+--------+----+------------------------+ 
|date    |quantity|diff|next_dates    | 
+-------------------+--------+----+------------------------+ 
|2016-09-12 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-13 00:00:00|0  |3 |[2016-09-12, 2016-09-13]| 
|2016-09-15 00:00:00|0  |2 |[2016-09-15]   | 
|2016-09-18 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
|2016-09-19 00:00:00|0  |3 |[2016-09-18, 2016-09-19]| 
+-------------------+--------+----+------------------------+ 

今組合2つのデータフレーム

val result = df.union(tempDf.select("date", "quantity")) 
    .orderBy("date") 

result.show() 
+-------------------+--------+ 
|    date|quantity| 
+-------------------+--------+ 
|2016-09-10 00:00:00|  1| 
|2016-09-11 00:00:00|  2| 
|2016-09-12 00:00:00|  0| 
|2016-09-13 00:00:00|  0| 
|2016-09-14 00:00:00|  0| 
|2016-09-15 00:00:00|  0| 
|2016-09-16 00:00:00|  1| 
|2016-09-17 00:00:00|  0| 
|2016-09-18 00:00:00|  0| 
|2016-09-19 00:00:00|  0| 
|2016-09-20 00:00:00|  2| 
+-------------------+--------+ 
関連する問題