私はcsvファイルから寄木細工のファイルを生成しなければならず、寄木細工のファイルにはファイル名と作成日という2つの列が必要です。ファイルからDataFrameを作成し、2つの新しい列を追加してから、parquet-csvを使用して寄木細工ファイルを生成してください。DataFrameからタイムスタンプを表示しようとしたときの例外について
def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, haveSchema: String): DataFrame = {
//logger.info("loadCSV. header is " + haveSchema.toString + ", inferSchema is true, pathCSV is " + pathCSV + " separator is " + separator)
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", haveSchema) // Use first line of all files as header
.option("delimiter", separator)
.option("nullValue","")
.option("mode","FAILFAST")
.option("inferSchema", "true") // Automatically infer data types
.load(pathCSV)
}
def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = {
df.write
.format("com.databricks.spark.csv")
.option("header", header)
.option("delimiter",delimiter)
.option("nullValue","")
.mode(saveMode)
//by default, gzip. Another values are uncompressed, snappy, gzip, lzo. This can be changed only at sqlContext Level.
//Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.
//.option("codec","spark.sql.parquet.compression.codec" + compression_codec)
.parquet(pathParquet)
}
@Test
def testAddNewFieldsToDataFrame() : Unit = {
import org.apache.spark.sql.functions._
implicit val sqlContext = CSVtoParquetTest.sql
val pathWrite = "src/test/resources/writefileWithNewField2.parquet"
val df: DataFrame = CSVtoParquet.loadCSV(sqlContext , pathCSVWithHeader, ";", "true")
df.show()
val newDFWithName = df.withColumn("generated-parquet-file",lit("my-generated-parquet-file"))
newDFWithName.show()
val now = Calendar.getInstance().getTime()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//esto es un String, tienes que crear un java.sql.Timestamp partiendo de esta cadena.
val reportDate = dateFormat.format(now)
val generatedAt = lit(new Timestamp(dateFormat.parse(reportDate).getTime))
println("generatedAt is " + generatedAt)
val newDFWithNameAndDate = newDFWithName.withColumn("generated_at",generatedAt)
newDFWithNameAndDate.show()
val saveMode = SaveMode.Overwrite
//writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String):
CSVtoParquet.writeDataFrame2Parquet(newDFWithNameAndDate , pathWrite, saveMode,"true",",")
//ahora necesito cargar el DataFrame o cargar el fichero parquet recien generado para comprobar que efectivamente hay dos nuevos campos.
val parquetFile = sqlContext.read.parquet(pathWrite)
parquetFile.registerTempTable("FINAL")
//busco uno de los campos recien creados
val distinctCountGeneratedDates = sqlContext.sql("SELECT distinct count(generated_at) FROM FINAL").collect()
Assert.assertNotNull("cannot be null!", distinctCountGeneratedDates)
println("distinctCountGeneratedDates.length is " + distinctCountGeneratedDates.length)
Assert.assertTrue(distinctCountGeneratedDates.length > 0)
val generatedDates = sqlContext.sql("SELECT * FROM FINAL").collect()
Assert.assertNotNull("cannot be null",generatedDates)
Assert.assertTrue(generatedDates.length > 0)
generatedDates.foreach(println)
println("Done!")
}
テストiがローカルモードでコードを実行し、寄木細工のファイルを生成し、私のSPARK_HOMEフォルダにコピーされたとき、私はこのコマンドを実行し、細かい動作しますが、::
これは私の実際のコードです
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val parquetFile = sqlContext.read.parquet("my_parquet_folder")
## this command break the running!
parquetFile.show
scala> parquetFile.printSchema
root
|-- symbol: string (nullable = true)
|-- longname: string (nullable = true)
|-- ticker: string (nullable = true)
|-- equity_ticker: string (nullable = true)
|-- weight: string (nullable = true)
|-- id_isin: string (nullable = true)
|-- parquetFilename: string (nullable = true)
|-- generated_at: timestamp (nullable = true)
例外:
Caused by: java.lang.AssertionError: assertion failed: Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, but got a 19-byte binary.
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$$anon$3.addBinary(CatalystRowConverter.scala:269)
at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:318)
at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365)
at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
このコードはUで動作し、なぜ私は理解していません私はローカル(およびクラスタ)モードでコードを実行すると、タイムスタンプは正常に表示されていません。
私はこのタイムスタンプをパーケットファイル内のメタデータ生成フィールドとして正しく生成できますが、何が間違っていますか?