2017-10-24 6 views
0

を使用して寄せ木ファイルに変換しようとすると、私は寄木細工にCSVデータを変換するために使用しています火花シェルスクリプトです:CSVここスパーク

import org.apache.spark.sql.types._; 
val sqlContext = new org.apache.spark.sql.SQLContext(sc); 
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/uploads/01ff5191-27c4-42db-a8e0-0d6594de3a5d/Worker_Snapshot_50_100000.csv"); 
val schema= StructType(Array(
    StructField("CF_EmplID",  StringType,true), 
    StructField("Worker",  StringType,true), 
    StructField("CF_Sup Org Level 1",   StringType,true), 
    StructField("CF_Sup Org Level 2",   StringType,true), 
    StructField("CF_Sup Org Level 3",    StringType,true), 
    StructField("CF_Sup Org Level 4",   StringType,true), 
    StructField("Business Process Transaction", StringType,true), 
    StructField("Record Date",   StringType,true), 
    StructField("CF_Fiscal Period",     StringType,true), 
    StructField("Business Process Type",     StringType,true), 
    StructField("Business Process Reason",     StringType,true), 
    StructField("Active Status",     BooleanType,true), 
    StructField("Age Group",     StringType,true), 
    StructField("Annual Base Pay",     StringType,true), 
    StructField("Base Pay Segment",     StringType,true), 
    StructField("Compa-Ratio",     StringType,true), 
    StructField("Company",     StringType,true), 
    StructField("Compensation Grade",     BooleanType,true), 
    StructField("Contingent Worker Type",     StringType,true), 
    StructField("Cost Center",     StringType,true), 
    StructField("Current Rating",     StringType,true), 
    StructField("Employee Type",     StringType,true), 
    StructField("Ending Headcount",     IntegerType,true), 
    StructField("Ethnicity",     StringType,true), 
    StructField("Exempt",     BooleanType,true), 
    StructField("FTE",     StringType,true), 
    StructField("Gender",     StringType,true), 
    StructField("Highest Degree",     StringType,true), 
    StructField("Hire Count",     IntegerType,true), 
    StructField("Hire Year Text",     IntegerType,true), 
    StructField("Hiring Source",     StringType,true), 
    StructField("Involuntary Termination",     StringType,true), 
    StructField("Involuntary Termination Count",     IntegerType,true), 
    StructField("Is Critical Job",     BooleanType,true), 
    StructField("Is High Loss Impact Risk",     BooleanType,true), 
    StructField("Is High Potential",     BooleanType,true), 
    StructField("Is Manager",     BooleanType,true), 
    StructField("Is Retention Risk",     BooleanType,true), 
    StructField("Job Category",     StringType,true), 
    StructField("Job Code",     IntegerType,true), 
    StructField("Job Family",     IntegerType,true), 
    StructField("Job Family Group",     StringType,true), 
    StructField("Job Profile",     StringType,true), 
    StructField("Length of Service in Years including Partial Year",     StringType,true), 
    StructField("Location",     StringType,true), 
    StructField("Location - Country",     StringType,true), 
    StructField("Loss Impact",     StringType,true), 
    StructField("Management Level",     StringType,true), 
    StructField("Manager",     StringType,true), 
    StructField("Manager Count",     IntegerType,true) 
    )); 


val dataFrame = spark.createDataFrame(df.rdd, schema) 

var newDf = dataFrame 
for(col <- dataFrame.columns){ 
    newDf = newDf.withColumnRenamed(col,col.replaceAll("\\s", "_")) 
    } 

    newDf.write.parquet("/output_dir/parquet") 

は、これまでのところ、かなりまっすぐ進むようだが、私は非int型の値をint型のフィールドに解析しようとしているように見えるこの例外を実行します。ここで

は私が取得しています例外です:

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) 
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) 
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) 
    ... 8 more 
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr22$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_9$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) 
    ... 20 more 

は、データフレームにスキーマを適用する際に、私は何か間違ったことをやっていますか?私は "sqlContext.read.format"の "inferSchema"オプションを使用してみましたが、タイプが間違っていると推測されるようです。代わりに

val dataFrame = spark.createDataFrame(df.rdd, schema) 

使用の

+1

ビルドに 'inferSchema'を使うことをお勧めします –

答えて

0

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header","true") 
    .schema(schema) 
    .load(...); 
0

は、それが両方のスキーマ列のデータ型と一致するよう、列のデータ型を変換し、DFとカスタムスキーマのスキーマを比較してみます。

関連する問題