2017-03-16 3 views
3

だが、私はこのようになりますデータフレームがあるとしましょう爆発:データフレームスパークScalaのJSON配列

+--------------------+--------------------+--------------------------------------------------------------+ 
|    id |   Name  |              Payment| 
+--------------------+--------------------+--------------------------------------------------------------+ 
|    1 |   James |[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]| 
+--------------------+--------------------+--------------------------------------------------------------+ 

とスキーマがある:私は上記の爆発することができますどのように

ルート

|-- id: integer (nullable = true) 
|-- Name: string (nullable = true) 
|-- Payment: string (nullable = true) 

を以下のJSON配列:

+--------------------+--------------------+-------------------------------+ 
|    id |   Name  |      Payment| 
+--------------------+--------------------+-------------------------------+ 
|    1 |   James | {"@id":1, "currency":"GBP"} | 
+--------------------+--------------------+-------------------------------+ 
|    1 |   James | {"@id":2, "currency":"USD"} | 
+--------------------+--------------------+-------------------------------+ 

私は以下のような分解機能を使用しようとしていますが、機能しません。それは文字列型を分解できないというエラーを出しており、マップや配列のいずれかを期待しています。これは、スキーマが配列/マップではなく文字列であることを示しているので、意味がありますが、これを適切な形式に変換する方法がわかりません。

val newDF = dataframe.withColumn("nestedPayment", explode(dataframe.col("Payment"))) 

ご協力いただきありがとうございます。

+0

は 'Payment'ではありません'struct'ですか? – mtoto

答えて

5

の配列にJSONを解析してから、結果にexplodeを使用する必要があります(explodeは配列が必要です)。あなたはすべてのPayment値は次のことができ、同じサイズ(この場合、例えば2)の配列を表すJSONが含まれているわかっている場合

  • :(。はスパーク2.0を仮定*)これを行うには

    第一及び第二の要素のハードコード抽出、配列でそれらをラップし、爆発:

    val newDF = dataframe.withColumn("Payment", explode(array(
        get_json_object($"Payment", "$[0]"), 
        get_json_object($"Payment", "$[1]") 
    ))) 
    
  • をあなたはすべてのレコードを保証することはできません場合は2要素の配列とJSONを持っていますが、あなたあなたが最大サイズに要素を解析するために、このトリックを使用して、その結果null Sを除外することができ、これらの配列の最大長さを保証することができます:

    val maxJsonParts = 3 // whatever that number is... 
    val jsonElements = (0 until maxJsonParts) 
            .map(i => get_json_object($"Payment", s"$$[$i]")) 
    
    val newDF = dataframe 
        .withColumn("Payment", explode(array(jsonElements: _*))) 
        .where(!isnull($"Payment")) 
    
+0

感謝します!あなたはこれがどれほど幸せだったのか信じられません。 – Richard

+0

whileループでこれを行う方法はありますか?より効率的であるようです。 – Paul

+0

whileループによって達成されると予想されるパフォーマンスの改善は、それほど実証できないほど小さいでしょう。これはSparkアプリケーションであり、ランタイムは実際のDataFrame操作に支配されており、それらを構築するドライバ側のコードではないと想定できます。このような「時期尚早の最適化」は、コードを読みにくくします。 –