を使用して寄せ木ファイルに変換しようとすると、私は寄木細工にCSVデータを変換するために使用しています火花シェルスクリプトです:CSVここスパーク
import org.apache.spark.sql.types._;
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/uploads/01ff5191-27c4-42db-a8e0-0d6594de3a5d/Worker_Snapshot_50_100000.csv");
val schema= StructType(Array(
StructField("CF_EmplID", StringType,true),
StructField("Worker", StringType,true),
StructField("CF_Sup Org Level 1", StringType,true),
StructField("CF_Sup Org Level 2", StringType,true),
StructField("CF_Sup Org Level 3", StringType,true),
StructField("CF_Sup Org Level 4", StringType,true),
StructField("Business Process Transaction", StringType,true),
StructField("Record Date", StringType,true),
StructField("CF_Fiscal Period", StringType,true),
StructField("Business Process Type", StringType,true),
StructField("Business Process Reason", StringType,true),
StructField("Active Status", BooleanType,true),
StructField("Age Group", StringType,true),
StructField("Annual Base Pay", StringType,true),
StructField("Base Pay Segment", StringType,true),
StructField("Compa-Ratio", StringType,true),
StructField("Company", StringType,true),
StructField("Compensation Grade", BooleanType,true),
StructField("Contingent Worker Type", StringType,true),
StructField("Cost Center", StringType,true),
StructField("Current Rating", StringType,true),
StructField("Employee Type", StringType,true),
StructField("Ending Headcount", IntegerType,true),
StructField("Ethnicity", StringType,true),
StructField("Exempt", BooleanType,true),
StructField("FTE", StringType,true),
StructField("Gender", StringType,true),
StructField("Highest Degree", StringType,true),
StructField("Hire Count", IntegerType,true),
StructField("Hire Year Text", IntegerType,true),
StructField("Hiring Source", StringType,true),
StructField("Involuntary Termination", StringType,true),
StructField("Involuntary Termination Count", IntegerType,true),
StructField("Is Critical Job", BooleanType,true),
StructField("Is High Loss Impact Risk", BooleanType,true),
StructField("Is High Potential", BooleanType,true),
StructField("Is Manager", BooleanType,true),
StructField("Is Retention Risk", BooleanType,true),
StructField("Job Category", StringType,true),
StructField("Job Code", IntegerType,true),
StructField("Job Family", IntegerType,true),
StructField("Job Family Group", StringType,true),
StructField("Job Profile", StringType,true),
StructField("Length of Service in Years including Partial Year", StringType,true),
StructField("Location", StringType,true),
StructField("Location - Country", StringType,true),
StructField("Loss Impact", StringType,true),
StructField("Management Level", StringType,true),
StructField("Manager", StringType,true),
StructField("Manager Count", IntegerType,true)
));
val dataFrame = spark.createDataFrame(df.rdd, schema)
var newDf = dataFrame
for(col <- dataFrame.columns){
newDf = newDf.withColumnRenamed(col,col.replaceAll("\\s", "_"))
}
newDf.write.parquet("/output_dir/parquet")
は、これまでのところ、かなりまっすぐ進むようだが、私は非int型の値をint型のフィールドに解析しようとしているように見えるこの例外を実行します。ここで
は私が取得しています例外です:
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr22$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_9$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 20 more
は、データフレームにスキーマを適用する際に、私は何か間違ったことをやっていますか?私は "sqlContext.read.format"の "inferSchema"オプションを使用してみましたが、タイプが間違っていると推測されるようです。代わりに
val dataFrame = spark.createDataFrame(df.rdd, schema)
使用の
ビルドに 'inferSchema'を使うことをお勧めします –