私は現在解決できない問題に直面しています。 私はSpark 1.6を使用しています。Scala - Spark - 1つの文字列を含むデータフレームを、リジッドタイプの列を持つDFに変換する方法は?
多くのフィールドを持つString JSONを含む1つの列を持つTEXT Dataframeがあります。私が唯一の文字列の列を持つDFにそれを変換するために、管理
{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":["20295930","20295930"]}
: いくつかのフィールドは、私が正しいJSONから推測しましたいくつかのスキーマに従って、文字列に配列し、ロングにいくつかの他の人を推定しなければなりませんフィールド。 正しいタイプに変換できませんでした。
希望するスキーマはdf_schemaにあります。 "value"列には、解析する必要があるString JSONが含まれています。 ここに私のコードだ:
var b = sqlContext.createDataFrame(df_txt.rdd,df_schema)
val z= {
b.select(b.columns.map(c => get_json_object(b("value"), s"$$.$c").alias(c)): _*)
}
var c = sqlContext.createDataFrame(z.rdd,df_schema)
c.show(1)
フィールド「prices_vat」の配列を文字列としてではなくdf_schema INEのような配列として理解されているので、私はこの例外で終わる:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 32, localhost): scala.MatchError: ["20295930","20295930"] (of class java.lang.String)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:159)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
してください助けて !
は、オブジェクトマッパーオプションを使用してマッピングするためにRDDを使用している: 私のコードにこれを追加するには、動作するようですか? –
あなたが来る解決策は何でも – wymeka