2016-11-08 17 views
2

私は現在解決できない問題に直面しています。 私はSpark 1.6を使用しています。Scala - Spark - 1つの文字列を含むデータフレームを、リジッドタイプの列を持つDFに変換する方法は?

多くのフィールドを持つString JSONを含む1つの列を持つTEXT Dataframeがあります。私が唯一の文字列の列を持つDFにそれを変換するために、管理

{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":["20295930","20295930"]} 

: いくつかのフィールドは、私が正しいJSONから推測しましたいくつかのスキーマに従って、文字列に配列し、ロングにいくつかの他の人を推定しなければなりませんフィールド。 正しいタイプに変換できませんでした。

希望するスキーマはdf_schemaにあります。 "value"列には、解析する必要があるString JSONが含まれています。 ここに私のコードだ:

 var b = sqlContext.createDataFrame(df_txt.rdd,df_schema) 
    val z= { 
    b.select(b.columns.map(c => get_json_object(b("value"), s"$$.$c").alias(c)): _*) 
    } 
    var c = sqlContext.createDataFrame(z.rdd,df_schema) 
    c.show(1) 

フィールド「prices_vat」の配列を文字列としてではなくdf_schema INEのような配列として理解されているので、私はこの例外で終わる:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 32, localhost): scala.MatchError: ["20295930","20295930"] (of class java.lang.String) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:159) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 

してください助けて !

+0

は、オブジェクトマッパーオプションを使用してマッピングするためにRDDを使用している: 私のコードにこれを追加するには、動作するようですか? –

+0

あなたが来る解決策は何でも – wymeka

答えて

2

幸いスパークは、JSONデータを扱うためのいくつか組み込まれている機能を持っています

scala> val jsonRDD = sc.parallelize(
    |  """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":["20295930","20295930"]}""" :: Nil) 
jsonRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27 

scala> val df = sqlContext.read.json(jsonRDD) 
df: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<string>] 

scala> df.show 
+-------------+--------------------+--------------------+ 
|  email|    eventid|   prices_vat| 
+-------------+--------------------+--------------------+ 
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]| 
+-------------+--------------------+--------------------+ 


scala> df.printSchema 
root 
|-- email: string (nullable = true) 
|-- eventid: string (nullable = true) 
|-- prices_vat: array (nullable = true) 
| |-- element: string (containsNull = true) 

はまた、あなたがしたい場合は、彼らはそれに応じてフォーマットする必要がありますprices_vatフィールドにこれらの数字を認識するためにスパークことに注意してください:

scala> val jsonRDD2 = sc.parallelize(
    |  """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":[20295930,20295930]}""" :: Nil) 
jsonRDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:27 

scala> val df2 = sqlContext.read.json(jsonRDD2) 
df2: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>] 

scala> df2.show 
+-------------+--------------------+--------------------+ 
|  email|    eventid|   prices_vat| 
+-------------+--------------------+--------------------+ 
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]| 
+-------------+--------------------+--------------------+ 


scala> df2.printSchema 
root 
|-- email: string (nullable = true) 
|-- eventid: string (nullable = true) 
|-- prices_vat: array (nullable = true) 
| |-- element: long (containsNull = true) 

DataFrameのjsonを既にお持ちの場合は、次のようなことができます。

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> val df = sc.parallelize(
    |  """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":[20295930,20295930]}""" :: Nil).toDF("json") 
df: org.apache.spark.sql.DataFrame = [json: string] 

scala> df.show 
+--------------------+ 
|    json| 
+--------------------+ 
|{"eventid":"3bc1c...| 
+--------------------+ 


scala> val rdd = df.rdd.map{case Row(json: String) => json} 
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at map at <console>:30 

scala> val outDF = sqlContext.read.json(rdd) 
outDF: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>] 

scala> outDF.show 
+-------------+--------------------+--------------------+ 
|  email|    eventid|   prices_vat| 
+-------------+--------------------+--------------------+ 
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]| 
+-------------+--------------------+--------------------+ 
+0

私はJSONを再読み込みしたくないと言って忘れました。私は既にJSONを含む1列のテキストDFを持っています。私はdf_txtをいくつかのJSON DFに変換する必要があります – wymeka

+1

あなたは正しいです!このコードを私のコードに追加すると動作するようです:var y = df_txt.select( "value")rdd.map(r => r(0).asInstanceOf [String])collect() var o = sc.parallelize y) val r = sqlContext.read.json(o) – wymeka

+0

ええ、基になるRDDにアクセスするだけです。私のソリューションがあなたの問題を助けてくれたら、それをupvote/acceptしてください。 –

0

evan058のおかげで、私たちはこの問題に対処する方法を考え出しました。

var y= df_txt.select("value").rdd.map(r => r(0).asInstanceOf[String]).collect() 
var o = sc.parallelize(y) 
val r = sqlContext.read.json(o) 
関連する問題