データフレームがあります。Json ParsingがSpark UDF内で予期しない出力をスローする
そのデータフレーム内のすべての列のデータ型は文字列です。列のいくつかは、私は一人でそのjsonStringを解析し、そこから値を取得し、新しい列としてそれを追加したい
+--------+---------+--------------------------+
|event_id|event_key| rights |
+--------+---------+--------------------------+
| 410|(default)|{"conditions":[{"devic...|
+--------+---------+--------------------------+
をjsonStringています。私はそれをするためにジャクソンパーサーを使用しています。ここで
は「権利」の値が
{
"conditions": [
{
"devices": [
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "IOS",
"type": "MOBILE",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "ANDROID",
"type": "MOBILE",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "IOS",
"type": "TABLET",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "ANDROID",
"type": "TABLET",
"provider": "TELETV"
}
],
"endDateTime": "2017-01-09T22:59:59.000Z",
"inclusiveGeoTerritories": [
"DE",
"IT",
"ZZ"
],
"mediaType": "Linear",
"offers": [
{
"endDateTime": "2017-01-09T22:59:59.000Z",
"isRestartable": true,
"isRecordable": true,
"isCUTVable": false,
"recordingMode": "UNIQUE",
"retentionCUTV": "P7DT2H",
"retentionNPVR": "P2Y6M5DT12H35M30S",
"offerId": "MOTOGP-RACE",
"offerType": "IPPV",
"startDateTime": "2017-01-09T17:00:00.000Z"
}
],
"platformName": "USA",
"startDateTime": "2017-01-09T17:00:00.000Z",
"territory": "USA"
}
]
}
ある今、私は、既存のデータフレームに新しい列を作成します。追加する新しい列の名前は「プロバイダ」
conditions -> devices -> provider
私はこれをデータフレームの非常に短い行にしたいと考えていました。そこで私は、UDFを作成したと私はJSON文字列を解析したいというUDFにし、そのUDF内jsonStringを保持し、文字列として値を返すように を必要とする列を渡しています
マイスパークコード:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
//
some codes to derive base dataframe
//
val fetchProvider_udf = udf(fetchProvider _)
val result = df.withColumn("provider",fetchProvider_udf(col("rights")))
result.select("event_id,"event_key","rights","provider").show(10)
def fetchProvider(jsonStr:String): String = {
val json = JsonMethods.parse(jsonStr)
val providerData = json \\ "conditions" \\"devices" \\ "provider"
compact(render(providerData))
}
また、ナビゲーションキーが利用できない場合はどうすれば対処できますか?例外をスローしますか? "条件"があり、 "デバイス"はそこにありますが、 "プロバイダ"キーはjson文字列にはありません。それではどうしたらいいですか?
誰かが私
期待出力助けてもらえ:
+--------+---------+-----------------------+-------------+
|event_id|event_key| rights |provider |
+--------+---------+-----------------------+-------------+
| 410|(unknown)|{"conditions":[{"devic...| TELETV |
+--------+---------+-----------------------+-------------+
が、私はあなたが最初のプロバイダの値を抽出したい場合は、以下を使用する必要があり、以下の出力
+--------+---------+-----------------------+------------------------------- ------------------------------------------------------+
|event_id|event_key| rights | provider |
+--------+---------+-----------------------+-------------------------- -----------------------------------------------------------+
| 410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV" } |
+--------+---------+-----------------------+----------------------------- --------------------------------------------------------+
してフィルタをない何らかの理由がありますsparkの 'get_json_object'を使用するには? – Mariusz
任意のスカラーパーサーを使用する必要があります。 –