2017-01-15 13 views
0

私は、ハイブテーブルの列を照会し、マージすることによって、構造体列のマップを構築しています。後でこれらのレコードをID列にグループ化し、これらのIDの関連マップを作成します。これは後でハイブテーブルに書き戻す前に他のデータフレームに結合されます。

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.hive.HiveContext; 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types.{StructType, StructField, StringType, MapType, ArrayType, LongType} 
import scala.collection.Map 
import scala.collection.JavaConversions._ 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.functions._ 
import org.apache.spark.rdd.RDD 

val eschema = new StructType(Array(StructField("id", LongType, nullable = false), StructField("DEFINITION", MapType(StringType, StructType(List(StructField("a",LongType,true), StructField("b", StringType, true), StructField("c",StringType,true), StructField("d",StringType,true), StructField("e",StringType,true), StructField("f",StringType,true), StructField("g",StringType,true), StructField("h",StringType,true), StructField("i",StringType,true), StructField("j",StringType,true), StructField("k",StringType,true))))))) 
val etrans = sqlContext.sql("""select id, map(table.col1, named_struct("a", table.col2, "b", table.col3, "c", table.col4, "d", table.col5, "e", table.col6, "f", table.col7, "g", table.col8, "h", table.col9, "i", table.col10, "j", table.col11, "k", table.col12)) AS DEFINITION from table""") 
val aggregatedRdd: RDD[Row] = etrans.rdd.groupBy(r => r.getAs[Long]("id")).map(row => Row(row._1, row._2.map(_.getAs[Map[String, List[(String, Any)]]]("DEFINITION")).toList)) 
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, eschema) 
aggregatedDf.registerTempTable("event") 
aggregatedDf.printSchema() 
aggregatedDf.show()    

私はマッチ誤差以下に遭遇してい

ERROR Executor: Exception in task 0.0 in stage 83.0 (TID 3652) 
scala.MatchError: List(Map(qwe -> [204,,abc,,positive,False,everywhere,always_record,counter,xyz,disabled]), Map(N/A -> [20,,something,,null,null,null,null,null,null,null]), Map(xyz -> [220,,something,,positive,False,everywhere,always_record,counter,xyz,enabled])) (of class scala.collection.immutable.$colon$colon) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:201) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toCatalystImpl(CatalystTypeConverters.scala:193) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 

答えて

0

MatchError内のクラスは次のとおりです。

私は思う:これはのArrayTypeを実装タイプがある

class scala.collection.immutable.$colon$colon 

問題は、ArrayTypeがList [(String、Any)]にクラスキャストされないということです:

_.getAs[Map[String, List[(String, Any)]]]("DEFINITION")) 

あなたは(のGetAsに使用したコード)は再帰的ではなく、ただasInstanceOf [地図[....:

は、これは良い仕事かもしれませんGetMapリクエスト()の定義は、

def getMap[K, V](i: Int): scala.collection.Map[K, V] = getAs[Map[K, V]](i) 

次に、Mapの値を実行し、未知のVからList [(String、Any)]にキャストするセカンダリクラスキャストを作成できます。

関連する問題