1
とRDDを返す私はsparkScala RDD
を持って次のようになります。RDD
のスパークスカラ - データフレームの列からJSONを解析し、列
df.printSchema()
|-- stock._id: string (nullable = true)
|-- stock.value: string (nullable = true)
2番目の列は、ネストされたJSON
次のとおりです。
[ { ""warehouse"" : ""Type1"" , ""amount"" : ""0.0"" }, { ""warehouse"" : ""Type1"" , ""amount"" : ""25.0"" }]
既存の2つの列だけでなく、JSON
の列も含めてRDD
を生成する必要があります。
_id, value , warehouse , amount
私はカスタム関数を使用してそれを実行しようとしましたが、私は私のRDD
にこの機能を適用するのに苦労し、必要な結果
import org.json4s.jackson.JsonMethods._
import org.json4s._
def extractWarehouses (value: String) {
val json = parse(value)
for {
JObject(warehouses) <- json
JField("warehouse", JString(warehouse)) <- warehouses
JField("amount", JDouble(amount)) <- warehouses
} yield (warehouse, amount)
}
これはうまくいかないエラーorg.apache.spark.sql.AnalysisExceptionが発生しました:データ型の不一致により 'explode(value)'を解決できません:関数への入力が配列またはマップ型、StringTypeではありません。 – colly434