2017-09-23 5 views
1

データフレームがあります。Json ParsingがSpark UDF内で予期しない出力をスローする

そのデータフレーム内のすべての列のデータ型は文字列です。列のいくつかは、私は一人でそのjsonStringを解析し、そこから値を取得し、新しい列としてそれを追加したい

+--------+---------+--------------------------+ 
|event_id|event_key|    rights  | 
+--------+---------+--------------------------+ 
|  410|(default)|{"conditions":[{"devic...| 
+--------+---------+--------------------------+ 

をjsonStringています。私はそれをするためにジャクソンパーサーを使用しています。ここで

は「権利」の値が

{ 
"conditions": [ 
    { 
     "devices": [ 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "TABLET", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "TABLET", 
       "provider": "TELETV" 
      } 
     ], 
     "endDateTime": "2017-01-09T22:59:59.000Z", 
     "inclusiveGeoTerritories": [ 
      "DE", 
      "IT", 
      "ZZ" 
     ], 
     "mediaType": "Linear", 
     "offers": [ 
      { 
       "endDateTime": "2017-01-09T22:59:59.000Z", 
       "isRestartable": true, 
       "isRecordable": true, 
       "isCUTVable": false, 
       "recordingMode": "UNIQUE", 
       "retentionCUTV": "P7DT2H", 
       "retentionNPVR": "P2Y6M5DT12H35M30S", 
       "offerId": "MOTOGP-RACE", 
       "offerType": "IPPV", 
       "startDateTime": "2017-01-09T17:00:00.000Z" 
      } 
     ], 
     "platformName": "USA", 
     "startDateTime": "2017-01-09T17:00:00.000Z", 
     "territory": "USA" 
    } 
] 
} 

ある今、私は、既存のデータフレームに新しい列を作成します。追加する新しい列の名前は「プロバイダ」

conditions -> devices -> provider 

私はこれをデータフレームの非常に短い行にしたいと考えていました。そこで私は、UDFを作成したと私はJSON文字列を解析したいというUDFにし、そのUDF内jsonStringを保持し、文字列として値を返すように を必要とする列を渡しています

マイスパークコード:

import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.functions._ 
import org.json4s._ 
import org.json4s.jackson.JsonMethods 
import org.json4s.jackson.JsonMethods._ 


    // 
    some codes to derive base dataframe 
    // 

    val fetchProvider_udf = udf(fetchProvider _) 
    val result = df.withColumn("provider",fetchProvider_udf(col("rights"))) 
    result.select("event_id,"event_key","rights","provider").show(10) 


    def fetchProvider(jsonStr:String): String = { 

    val json = JsonMethods.parse(jsonStr) 

    val providerData = json \\ "conditions" \\"devices" \\ "provider" 

    compact(render(providerData)) 
    } 

また、ナビゲーションキーが利用できない場合はどうすれば対処できますか?例外をスローしますか? "条件"があり、 "デバイス"はそこにありますが、 "プロバイダ"キーはjson文字列にはありません。それではどうしたらいいですか?

誰かが私

期待出力助けてもらえ:

+--------+---------+-----------------------+-------------+ 
|event_id|event_key|    rights  |provider  | 
+--------+---------+-----------------------+-------------+ 
|  410|(unknown)|{"conditions":[{"devic...| TELETV | 
+--------+---------+-----------------------+-------------+ 

が、私はあなたが最初のプロバイダの値を抽出したい場合は、以下を使用する必要があり、以下の出力

+--------+---------+-----------------------+-------------------------------  ------------------------------------------------------+ 
|event_id|event_key|    rights  |              provider  | 
     +--------+---------+-----------------------+--------------------------  -----------------------------------------------------------+ 
|  410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"  } | 
    +--------+---------+-----------------------+-----------------------------  --------------------------------------------------------+ 
+1

してフィルタをない何らかの理由がありますsparkの 'get_json_object'を使用するには? – Mariusz

+0

任意のスカラーパーサーを使用する必要があります。 –

答えて

0

を取得していますがUDF内のコード:

(json \\ "conditions" \\"devices")[0] \\ "provider" 

現在のコードは(Mapとして)すべてのプロバイダを取得し、UDF結果として文字列に変換されます。

また、UDFで例外が発生しないようにする必要があります(ジョブ全体が失敗するため)。最も簡単な方法は、nullを返し、その後することです: - フィルタをdf.provider.isNull()

  • によってあなたが唯一の有効なエントリが保持したい場合は - あなたが調査したい場合は

    • df.provider.isNullNull()