2017-01-11 14 views
3

与えられたRowオブジェクトをjsonに変換する簡単な方法はありますか? Spark 2でRowをjsonに変換する方法Scala

は、JSON出力に全体のデータフレームの変換についてこれを見つけた: Spark Row to JSON

しかし、私はちょうどJSONに1行を変換したいです。 ここに私がしようとしているものの擬似コードがあります。

もっと正確には、私はDataframeの入力としてjsonを読んでいます。 私は主に列に基づいているが、列に収まらないすべての情報に対して1つのjsonフィールドを持つ新しい出力を生成しています。この関数を記述するための最も簡単な方法は何か

私の質問:convertRowToJson()

def convertRowToJson(row: Row): String = ??? 

def transformVenueTry(row: Row): Try[Venue] = { 
    Try({ 
    val name = row.getString(row.fieldIndex("name")) 
    val metadataRow = row.getStruct(row.fieldIndex("meta")) 
    val score: Double = calcScore(row) 
    val combinedRow: Row = metadataRow ++ ("score" -> score) 
    val jsonString: String = convertRowToJson(combinedRow) 
    Venue(name = name, json = jsonString) 
    }) 
} 

Psidomのソリューション:行だけではない、ネストされた行に1つのレベルを持っている場合

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

にのみ機能します。これはスキーマです:

StructType(
    StructField(indicator,StringType,true), 
    StructField(range, 
    StructType(
     StructField(currency_code,StringType,true), 
     StructField(maxrate,LongType,true), 
     StructField(minrate,LongType,true)),true)) 

はまた、アルテムの提案を試みたが、それはコンパイルされませんでした:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { 
    val sparkContext = sqlContext.sparkContext 
    import sparkContext._ 
    import sqlContext.implicits._ 
    import sqlContext._ 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataFrame = rowRDD.toDF() //XXX does not compile 
    dataFrame 
} 

答えて

1

json入力を読み込んでjson出力を生成する必要があります。 ほとんどのフィールドは個別に処理されますが、いくつかのjsonサブオブジェクトを保存するだけで済みます。

Sparkがデータフレームを読み込むと、レコードが行に変換されます。 Rowはjsonのような構造です。これを変換してjsonに書き出すことができます。

しかし、新しいフィールドとして使用するには、いくつかのサブjson構造を文字列に変換する必要があります。

これは次のように行うことができる。

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address")) 

location.address着信JSONベースのデータフレームのサブJSONオブジェクトへのパスです。 address_jsonは、そのオブジェクトの列名をjsonの文字列バージョンに変換したものです。

to_jsonは、Spark 2.1で実装されています。

json4sを使用して出力jsonを生成する場合は、address_jsonをAST表現に解析する必要があります。そうでない場合、出力jsonはaddress_json部分をエスケープします。

1

基本的に、あなただけの1行を含むデータフレームを持つことができます。したがって、最初のデータフレームをフィルタ処理し、jsonに解析することができます。

+0

ご意見ありがとうございます。私はあなたが接近しようとした: DEF row2DataFrame(行:行、sqlContext:SQLContext):DATAFRAME = { ヴァルsparkContext = sqlContext.sparkContext インポートsparkContext._ インポートsqlContext.implicits._ インポートsqlContext._ ヴァルrowRDD。 RDD [Row] = sqlContext.sparkContext.makeRDD(row :: Nil) val dataFrame = rowRDD.toDF()// XXXがコンパイルされない dataFrame } コンパイルされませんでした。 –

5

あなたはJSONを地図に行オブジェクトを変換し、それを変換するgetValuesMapを使用することができます。

import scala.util.parsing.json.JSONObject 
import org.apache.spark.sql._ 

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")  
val row = df.first()   // this is an example row object 

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

convertRowToJSON(row) 
// res46: String = {"A" : 1, "B" : 2, "C" : 3} 
+0

これはうまくいった。ありがとう! –

+2

訂正: 実際はMap/Structの最初のレベルでのみ機能し、ネストされたマップでは機能しません。キーではなく値が表示されます。 –

+1

@SamiBadawiどこでネストされたマップの解決策を見つけることができましたか? –

1

JSONスキーマを持っていますが、行&にスキーマを適用する必要があるので、行は、スキーマを持っていませんJSonに変換します。ここであなたがそれを行う方法があります。

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 

def convertRowToJson(row: Row): String = { 

    val schema = StructType(
     StructField("name", StringType, true) :: 
     StructField("meta", StringType, false) :: Nil) 

     return sqlContext.applySchema(row, schema).toJSON 
} 
0

私はArtem、KiranM、Psidomの提案を組み合わせています。歩道やエラーの多くを行なったし、私は、ネストされた構造を試験し、この解決策を思い付いた:

def row2Json(row: Row, sqlContext: SQLContext): String = { 
    import sqlContext.implicits 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataframe = sqlContext.createDataFrame(rowRDD, row.schema) 
    dataframe.toJSON.first 
} 

このソリューションは、働いていたが、ドライバモードで動作している間だけ。

1

注意scalaクラスscala.util.parsing.json.JSONObjectは非推奨であり、null値をサポートしません。

@deprecated( "このクラスは削除されます。"、 "2.11.0")を

"JSONFormat.defaultFormatはnull値を処理しない" 私は同じだった

https://issues.scala-lang.org/browse/SI-5092

+0

ありがとうArnon。 Scalaでjsonサポートを近代化することについていくつかの話がありました。 –

0

問題は、正式なスキーマ(配列なし)のパーケットファイルがあり、jsonイベントを取得したいだけです。私は次のようにしましたが、正常に動作するように見えます(スパーク2.1):

import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.{DataFrame, Dataset, Row} 
import scala.util.parsing.json.JSONFormat.ValueFormatter 
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject} 

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = { 
    schema.fields.map { 
    field => 
     try{ 
     if (field.dataType.typeName.equals("struct")){ 
      field.name -> getValuesMap(row.getAs[Row](field.name), field.dataType.asInstanceOf[StructType]) 
     }else{ 
      field.name -> row.getAs[T](field.name) 
     } 
     }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}} 
    }.filter(xy => xy._2 != null).toMap 
} 

def convertRowToJSON(row: Row, schema: StructType): JSONObject = { 
    val m: Map[String, Any] = getValuesMap(row, schema) 
    JSONObject(m) 
} 
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
val defaultFormatter : ValueFormatter = (x : Any) => x match { 
    case s : String => "\"" + JSONFormat.quoteString(s) + "\"" 
    case jo : JSONObject => jo.toString(defaultFormatter) 
    case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
    case ja : JSONArray => ja.toString(defaultFormatter) 
    case other => other.toString 
} 

val someFile = "s3a://bucket/file" 
val df: DataFrame = sqlContext.read.load(someFile) 
val schema: StructType = df.schema 
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema)) 
関連する問題