2016-11-30 1 views
0

私はcsvファイルから寄木細工のファイルを生成しなければならず、寄木細工のファイルにはファイル名と作成日という2つの列が必要です。ファイルからDataFrameを作成し、2つの新しい列を追加してから、parquet-csvを使用して寄木細工ファイルを生成してください。DataFrameからタイムスタンプを表示しようとしたときの例外について

def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, haveSchema: String): DataFrame = { 

//logger.info("loadCSV. header is " + haveSchema.toString + ", inferSchema is true, pathCSV is " + pathCSV + " separator is " + separator) 
sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", haveSchema) // Use first line of all files as header 
    .option("delimiter", separator) 
    .option("nullValue","") 
    .option("mode","FAILFAST") 
    .option("inferSchema", "true") // Automatically infer data types 
    .load(pathCSV) 

} 

def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = { 

df.write 
    .format("com.databricks.spark.csv") 
    .option("header", header) 
    .option("delimiter",delimiter) 
    .option("nullValue","") 
    .mode(saveMode) 
    //by default, gzip. Another values are uncompressed, snappy, gzip, lzo. This can be changed only at sqlContext Level. 
    //Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL. 
    //.option("codec","spark.sql.parquet.compression.codec" + compression_codec) 
    .parquet(pathParquet) 
} 

@Test 
def testAddNewFieldsToDataFrame() : Unit = { 

import org.apache.spark.sql.functions._ 

implicit val sqlContext = CSVtoParquetTest.sql 

val pathWrite = "src/test/resources/writefileWithNewField2.parquet" 

val df: DataFrame = CSVtoParquet.loadCSV(sqlContext , pathCSVWithHeader, ";", "true") 

df.show() 

val newDFWithName = df.withColumn("generated-parquet-file",lit("my-generated-parquet-file")) 
newDFWithName.show() 

val now = Calendar.getInstance().getTime() 
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 
//esto es un String, tienes que crear un java.sql.Timestamp partiendo de esta cadena. 
val reportDate = dateFormat.format(now) 

val generatedAt = lit(new Timestamp(dateFormat.parse(reportDate).getTime)) 
println("generatedAt is " + generatedAt) 
val newDFWithNameAndDate = newDFWithName.withColumn("generated_at",generatedAt) 
newDFWithNameAndDate.show() 

val saveMode = SaveMode.Overwrite 
//writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): 
CSVtoParquet.writeDataFrame2Parquet(newDFWithNameAndDate , pathWrite, saveMode,"true",",") 

//ahora necesito cargar el DataFrame o cargar el fichero parquet recien generado para comprobar que efectivamente hay dos nuevos campos. 

val parquetFile = sqlContext.read.parquet(pathWrite) 

parquetFile.registerTempTable("FINAL") 

//busco uno de los campos recien creados 
val distinctCountGeneratedDates = sqlContext.sql("SELECT distinct count(generated_at) FROM FINAL").collect() 

Assert.assertNotNull("cannot be null!", distinctCountGeneratedDates) 
println("distinctCountGeneratedDates.length is " + distinctCountGeneratedDates.length) 
Assert.assertTrue(distinctCountGeneratedDates.length > 0) 

val generatedDates = sqlContext.sql("SELECT * FROM FINAL").collect() 
Assert.assertNotNull("cannot be null",generatedDates) 
Assert.assertTrue(generatedDates.length > 0) 
generatedDates.foreach(println) 
println("Done!") 

} 

テストiがローカルモードでコードを実行し、寄木細工のファイルを生成し、私のSPARK_HOMEフォルダにコピーされたとき、私はこのコマンドを実行し、細かい動作しますが、::

これは私の実際のコードです

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val parquetFile = sqlContext.read.parquet("my_parquet_folder") 
## this command break the running! 
parquetFile.show 

scala> parquetFile.printSchema 
root 
|-- symbol: string (nullable = true) 
|-- longname: string (nullable = true) 
|-- ticker: string (nullable = true) 
|-- equity_ticker: string (nullable = true) 
|-- weight: string (nullable = true) 
|-- id_isin: string (nullable = true) 
|-- parquetFilename: string (nullable = true) 
|-- generated_at: timestamp (nullable = true) 

例外:

Caused by: java.lang.AssertionError: assertion failed: Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, but got a 19-byte binary. 
at scala.Predef$.assert(Predef.scala:179) 
at org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$$anon$3.addBinary(CatalystRowConverter.scala:269) 
at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:318) 
at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365) 
at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405) 
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209) 
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) 
at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:168) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:88) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

このコードはUで動作し、なぜ私は理解していません私はローカル(およびクラスタ)モードでコードを実行すると、タイムスタンプは正常に表示されていません。

私はこのタイムスタンプをパーケットファイル内のメタデータ生成フィールドとして正しく生成できますが、何が間違っていますか?

答えて

0

解決策が見つかりました。私はTimeStampTypeとしてタイムスタンプをキャストしなければなりませんでした。これはコードですが、おそらく最適ではありませんが、機能します。このコードでは

val now = Calendar.getInstance().getTime() 

    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 
    val reportDate = dateFormat.format(now) 

    val timestamp: Long = System.currentTimeMillis/1000 

    val generatedAt = lit(new Timestamp(dateFormat.parse(reportDate).getTime)) 
    println("generatedAt is " + generatedAt) 

    var newDFWithNameAndDate : org.apache.spark.sql.DataFrame = null 

    if (metadataCreationName == ""){ 
    println("ATTENTION! mra.generate.metadata.name should not be empty as parameter or as setting in the property file. Applying default name, parquetFilename!") 
    newDFWithNameAndDate = newDFWithName.withColumn("generated_at",generatedAt) 
    }else { 
    newDFWithNameAndDate = newDFWithName.withColumn(metadataCreationName ,generatedAt) 
    } 

    val schema : StructType = newDFWithNameAndDate.schema 
    println("schema is " + schema) 
    val listFromSchema : List[StructField] = schema.toList 

    val aSeq = mutable.MutableList[org.apache.spark.sql.Column]() 

    val elementsType : StringBuilder = new StringBuilder() 

    val numElements = listFromSchema.size 
    var cont = 1 
    //this part need to be optimized! 
    for (elementFromSchema <- listFromSchema.reverse){ 
    println("elementFromSchema is " + elementFromSchema + ". name: " + elementFromSchema.name + ". dataType: " + elementFromSchema.dataType) 
    val aColumn : org.apache.spark.sql.Column = new org.apache.spark.sql.Column(elementFromSchema.name) 
    println("aColumn is " + aColumn) 
    aSeq += aColumn 
    val elementDataType = elementFromSchema.dataType 
    println(elementDataType) 
    val myElement = elementDataType match { 
     case TimestampType => "timestamp" 
     case StringType => "string" 
     case IntegerType => "integer" 
    } 
    elementsType.append(myElement) 
    if (cont<numElements) { 
     elementsType.append(";") 
     cont+=1 
    } 
    }//for 
    println("elementsType is " + elementsType.toString()) 
    var df1 = newDFWithNameAndDate.select(aSeq:_*) 

    df1 = CSVtoParquet.castingDataFrame(df1, elementsType.toString(), CSVtoParquet.mapString2DataType) 

    df1.show() 

    writeDataFrame2Parquet(df1 , path_output_parquet, saveMode,header,separator) 


def castingDataFrame(df: DataFrame, typeString: String, mapString2DataType: Map[String, DataType]): DataFrame = { 

val arrayDataTypes: Array[DataType] = typeString split ";" map { name => mapString2DataType.getOrElse(name, StringType) } 

val xprs = df.columns zip arrayDataTypes map { pairType => df.col(pairType._1) cast pairType._2 as pairType._1 } 

df.select(xprs: _*) 

} 


def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = { 
logger.info("writeDataFrame2Parquet header is " + header 
    +" saveMode is " + saveMode 
    + " pathParquet is " + pathParquet 
    + " delimiter is " + delimiter 
) 
df.write 
    .format("com.databricks.spark.csv") 
    .option("header", header) 
    .option("delimiter",delimiter) 
    .option("nullValue","") 
    .mode(saveMode) 
    //by default, gzip. Another values are uncompressed, snappy, gzip, lzo. This can be changed only at sqlContext Level. 
    //Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL. 
    //.option("codec","spark.sql.parquet.compression.codec" + compression_codec) 
    .parquet(pathParquet) 
} 

、あなたは寄木細工生成されたファイル名や作成日などの2つの余分なメタデータフィールドと寄木細工のファイルを生成することができます。

scala> val parquetFileDF = sqlContext.read.parquet("componentes_indices_comma_output7") 
parquetFileDF: org.apache.spark.sql.DataFrame = [createdAt1: timestamp, fieldName1: string, id_isin: string, weight: string, equity_ticker: string, ticker: string, longname: string, symbol: string] 

scala> parquetFileDF show false 
16/12/01 16:59:20 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 

|createdAt1   |fieldName1                        |id_isin  |weight |equity_ticker|ticker |longname     |symbol   | 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0010310956|1,589563 |SALM NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0003028904|2,367745 |SCHA NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0010736879|2,513974 |SCHB NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|BMG7945E1057|0,444097 |SDRL NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0003053605|2,276359 |STB NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0010096985|19,105393|STL NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|LU0075646355|2,947026 |SUBC NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0010063308|12,973417|TEL NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0003078800|1,964136 |TGS NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|NO0010208051|6,068747 |YAR NO  |OBX Index|BB - IDX - KINGDOM OF NORWAY|BB,2000000205203| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|US00846U1016|   |A UN   |RIY Index|RIY Index     |BB,2001000008410| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|US0138721065|   |AA UN  |RIY Index|RIY Index     |BB,2001000008410| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|US02376R1023|   |AAL UW  |RIY Index|RIY Index     |BB,2001000008410| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|US00751Y1064|   |AAP UN  |RIY Index|RIY Index     |BB,2001000008410| 
|2016-12-01 16:59:06.0|/Users/aisidoro/Desktop/mra-csv-converter/csvpruebacsvconverter/componentes_indices_comma_output7|US0378331005|   |AAPL UW  |RIY Index|RIY Index     |BB,2001000008410| 
... 
+---------------------+-------------------------------------------------------------------------------------------------+------------+---------+-------------+---------+----------------------------+----------------+ 
only showing top 20 rows 
関連する問題