0

おはよう。ScalaでのJavaタイムスタンプの問題 - java.lang.IllegalArgumentExceptionエラーが発生しました

Spark 2.1.0(Scala 2.11.8が組み込まれています)でRDD、データフレーム、データセットのパフォーマンスを比較しています。私はhttps://data.london.gov.uk/dataset/smartmeter-energy-use-data-in-london-householdsから自由に入手できるデータをダウンロードし、後でそのスクリプトを実行しました。次のようにあなたのプレビューを与えるために、尋問のデータが見えます:

LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped 
MAC000002,Std,2012-10-12 00:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 01:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 01:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 02:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 02:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 03:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 03:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 04:00:00.0000000, 0 ,ACORN-A,Affluent 

は私の比較作業を達成するために、私は時間のスパークをインポートおよび変換の異なる段階で[文字列、文字列、タイムスタンプ、ダブル、文字列、文字列]上記6つの変数のうちの1つ。私は、データフレームとデータセットにデータをマップすることに成功しましたが、RDDの点では同じことを達成することはできません。私はRDDにファイルを変換しようと毎回、私は次のエラーを取得する:

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) 
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] 

変数「DateTimeのは」は既に「YYYY-MM-DD HHのタイムスタンプ形式として表現されているので、私は非常に混乱しています: mm:ss [.fffffffff] 'となります。私はこれらの投稿(Convert Date to Timestamp in ScalaHow to convert unix timestamp to date in SparkSpark SQL: parse timestamp without seconds)を読んだが、私の必要を満たしていない。

定義されたクラス 'londonDataSchemaDS'が私のデータセット変換では動作しますが、私のRDDでは動作しないので、さらに混乱します。

これは私が使用しているスクリプトです:

import java.io.File 
import java.sql.Timestamp 

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} 

val sparkSession = SparkSession.builder.appName("SmartData London").master("local[*]").getOrCreate() 

val LCLid = StructField("LCLid", DataTypes.StringType) 
val stdorToU = StructField("stdorToU", DataTypes.StringType) 
val DateTime = StructField("DateTime", DataTypes.TimestampType) 
val KWHhh = StructField("KWH/hh (per half hour) ", DataTypes.DoubleType) 
val Acorn = StructField("Acorn", DataTypes.StringType) 
val Acorn_grouped = StructField("Acorn_grouped", DataTypes.StringType) 

val fields = Array(LCLid,stdorToU,DateTime,KWHhh,Acorn,Acorn_grouped) 
val londonDataSchemaDF = StructType(fields) 

import sparkSession.implicits._ 

case class londonDataSchemaDS(LCLid: String, stdorToU: String, DateTime: java.sql.Timestamp, KWHhh: Double, Acorn: String, Acorn_grouped: String) 

val t0 = System.nanoTime() 

val loadFileRDD=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 
.map(_.split(",")) 
.map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).toDouble, r(4), r(5))) 

val t1 = System.nanoTime() 

val loadFileDF=sparkSession.read.schema(londonDataSchemaDF).option("header", true) 
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 

val t2=System.nanoTime() 

val loadFileDS=sparkSession.read.option("header", "true") 
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 
.withColumn("DateTime", $"DateTime".cast("timestamp")) 
.withColumnRenamed("KWH/hh (per half hour) ", "KWHhh") 
.withColumn("KWHhh", $"KWHhh".cast("double")) 
.as[londonDataSchemaDS] 

val t3 = System.nanoTime() 

loadFileRDD.take(10) 

loadFileDF.show(10, false) 
loadFileDF.printSchema() 

loadFileDS.show(10, false) 
loadFileDS.printSchema() 

println("Time Elapsed to implement RDD: " + (t1 - t0) * 1E-9 + " seconds") 
println("Time Elapsed to implement DataFrame: " + (t2 - t1) * 1E-9 + " seconds") 
println("Time Elapsed to implement Dataset: " + (t3 - t2) * 1E-9 + " seconds") 

この上の任意の助けが最も評価および/または右方向にナッジだろう。

多くのおかげで、

クリスチャン

答えて

0

私は私が間違って何をしたか知っています。私はDataFrameとDatasetの変換に巻き込まれ、ヘッダーをスキップする組み込み関数を持っていて、RDD変換プロセスからヘッダーを削除するのを忘れてしまった。

を読み取るため

val loadFileRDDwH=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv").map(_.split(",")) 

val header=loadFileRDDwH.first() 

val loadFileRDD=loadFileRDDwH.filter(_(0) != header(0)).map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).split("\\s+").mkString.toDouble, r(4), r(5))) 

ありがとう:以下の行を追加することにより

、私は(私はタイムスタンプで、フォーマットエラーを得ていた理由が説明する)RDDに私のcsvファイルを変換に成功し、ヘッダを削除し、クリスチャン

関連する問題