2017-02-06 5 views
1

とRDDを返す私はsparkScala RDDを持って次のようになります。RDDスパークスカラ - データフレームの列からJSONを解析し、列

df.printSchema() 

|-- stock._id: string (nullable = true) 
|-- stock.value: string (nullable = true) 

2番目の列は、ネストされたJSON次のとおりです。

[ { ""warehouse"" : ""Type1"" , ""amount"" : ""0.0"" }, { ""warehouse"" : ""Type1"" , ""amount"" : ""25.0"" }] 

既存の2つの列だけでなく、JSONの列も含めてRDDを生成する必要があります。

_id, value , warehouse , amount 

私はカスタム関数を使用してそれを実行しようとしましたが、私は私のRDDにこの機能を適用するのに苦労し、必要な結果

import org.json4s.jackson.JsonMethods._

import org.json4s._

def extractWarehouses (value: String) { 
    val json = parse(value) 
    for { 
     JObject(warehouses) <- json 
     JField("warehouse", JString(warehouse)) <- warehouses 
     JField("amount", JDouble(amount)) <- warehouses 
    } yield (warehouse, amount) 
    } 

答えて

1

Asを取得していますあなたはvalueがjsonオブジェクトのリストを保持しているjson配列であると言います。それを展開し、個々のプロパティを以下のような列として取得する必要があります:

import org.apache.spark.sql.functions 

val flattenedDF = df.select(functions.column("_id"), functions.explode(df("value")).as("value")) 
val result = flattenedDF.select("_id", "value.warehouse", "value.amount") 
result.printSchema() 
+0

これはうまくいかないエラーorg.apache.spark.sql.AnalysisExceptionが発生しました:データ型の不一致により 'explode(value)'を解決できません:関数への入力が配列またはマップ型、StringTypeではありません。 – colly434

関連する問題