2016-10-07 13 views
2

私はSparkからdynamodbテーブルを読んでいますが、このテーブルには1つのフィールドに1つのJSON文字列があり、他のフィールドには文字列があります。 JSONフィールドは読み込めますが、入れ子になったJSONフィールドは読み込めません。これはquery Json Column using dataframesの重複ではありません。質問では、JSON文字列から列を抽出する方法を説明しますが、入れ子になったJSON列は抽出しません。Sparkを使用してDynamoDB JSON文字列からネストされたJsonフィールドを抽出しますか?

import com.github.traviscrawford.spark.dynamodb._ 
val users = sqlContext.read.dynamodb("Dynamodb_table") 

users.show(1)

サンプルデータは、私はCOL1(JSON構造)とIDフィールドからいくつかのフィールドを抽出する必要

|col1              | ID | field2|field3| 
------------------------------------------------------------------------------------- 
|{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1 | X1 |Y1 | 

を設定します。私はどのようにJSONフィールド(col1)を解析し、hereで説明されているようにcol1からフィールド 'c'を取得することができますが、入れ子フィールドを抽出することはできません。

マイコード:私は、上記のデータフレーム上の同じget_json_objectを適用しようとすると、

val users = sqlContext.read.dynamodb("Dynamodb_table") 
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID") 

data.show(1,false) 
|a            |c |ID| 
--------------------------------------------------------- 
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1| 

は今、私はすべてのnull値を取得します。

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID") 
nestedData.show(false) 

|get_json_object(a, '$.b')| c | ID| 
------------------------------------ 
|null      |valC|A1 |  

col 'a'は配列と構造体を持っているので、私も爆発しました。しかし、データフレーム 'データ'がcol/field 'a'を配列の代わりに文字列として返すので、これはうまくいきませんでした。これを解決する方法はありますか?

アップデート:JSON4sとnet.liftweb.json.parseを使用して解析を試みました。それでも役に立たなかった

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String) 

import net.liftweb.json.parse 
val parseJson = udf((data: String) => { 
implicit val formats = net.liftweb.json.DefaultFormats 
parse(data).extract[Data] 
}) 

val parsed = users.withColumn("parsedJSON", parseJson($"data")) 
parsed.show(1) 

これらのパーサーを使用すると、すべての値がnullになりました。

私の予想結果は:私はそうのステップによって、この手順に従ってみましょう私は、パズルのすべての必要な部分はここに既にあることを信じているデータセット

|b  |x |c | ID| 
-------------------- 
|value1|23|valC|A1 | 
|value2|52|valC|A1 | 

答えて

1

から平らな構造を取得しようとしています。あなたのデータは同等です:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

、我々はUDFを定義するために、たとえばLINQスタイルのAPIを使用することができます:

val getBs = udf((s: String) => for { 
    JString(b) <- parse(s) \ "a" \ "b" 
} yield b) 

val df = Seq((
    """{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1" 
)).toDF("col1", "ID", "field2", "field3") 

スパークはリフトと同じクエリAPIを実装しjson4sを提供します

複数のフィールドを抽出する場合は、もちろんこれを拡張できます。 JSON文字列が複数のフィールド

{"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"} 

することができます持っている場合たとえば:

for { 
    JObject(a) <- parse(s) \ "a" 
    JField("b", JString(b)) <- a 
    JField("d", JInt(d)) <- a 
} yield (b, d) 

をこれは、両方のフィールドが存在しているそうでなければ、一致がないことを前提としています。結果に

val withBs = df.withColumn("b", explode(getBs($"col1"))) 

+--------------------+---+------+------+------+ 
|    col1| ID|field2|field3|  b| 
+--------------------+---+------+------+------+ 
|{"a":[{"b":"value...| A1| X1| Y1|value1| 
|{"a":[{"b":"value...| A1| X1| Y1|value2| 
+--------------------+---+------+------+------+ 

あなたの試みであることができ、このような

case class A(b: Option[String], d: Option[Int]) 

(parse(s) \ "a").extract(Seq[A]) 

UDFフィールドを抽出するためにexplodeを使用しています:あなたはXPath-like式や抽出を好むかもしれないフィールドが欠落している処理するにはaaInfoのシーケンスであると予想しているので、Liftを使用するのは間違っていますが、それはOption[aInfo]Option[Seq[aInfo]]

このように定義されたクラスは、問題なく動作するはずです。

あなたは現在のビルド(スパーク2.1.0)を使用している場合があるスキーマを必要とSPARK-17699によって導入from_json方法です:

import org.apache.spark.sql.types._ 

val bSchema = StructType(Seq(StructField("b", StringType, true))) 
val aSchema = StructField("a", ArrayType(bSchema), true) 
val cSchema = StructField("c", StringType, true) 

val schema = StructType(Seq(aSchema, cSchema)) 

がとのように適用することができます後

import org.apache.spark.sql.functions.from_json 

val parsed = df.withColumn("col1", from_json($"col1", schema)) 

通常の表記法を使用してフィールドを選択することができます:

parsed.select($"col1.a.b") 
関連する問題