2017-10-04 5 views
0

私はscalaでSpark 1.6を使用しています。Spark scala - ネストされたStructTypeからMapへの変換

オブジェクトを使ってElasticSearchでインデックスを作成しました。オブジェクト "params"はMap [String、Map [String、String]]として作成されました。例:その後、私は値を更新するためにElasticsearchインデックスを読み込むしようとしています

{ 
     "_index": "x", 
     "_type": "1", 
     "_id": "xxxxxxxxxxxx", 
     "_score": 1, 
     "_timestamp": 1506537199650, 
     "_source": { 
      "a": "toto", 
      "b": "tata", 
      "c": "description", 
      "params": { 
       "p1": { 
       "p1_detail": "table1" 
       }, 
       "p2": { 
       "p2_detail": "table2", 
       "p2_filter": "filter2" 
       }, 
       "p3": { 
       "p3_detail": "table3" 
       } 
      } 
     } 
    }, 

:私は次のようになり、レコードを与える

val params : Map[String, Map[String, String]] = ("p1" -> ("p1_detail" -> "table1"), "p2" -> (("p2_detail" -> "table2"), ("p2_filter" -> "filter2")), "p3" -> ("p3_detail" -> "table3")) 

スパークは、次のスキーマを使用して索引を読み取ります

|-- a: string (nullable = true) 
|-- b: string (nullable = true) 
|-- c: string (nullable = true) 
|-- params: struct (nullable = true) 
| |-- p1: struct (nullable = true) 
| | |-- p1_detail: string (nullable = true) 
| |-- p2: struct (nullable = true) 
| | |-- p2_detail: string (nullable = true) 
| | |-- p2_filter: string (nullable = true) 
| |-- p3: struct (nullable = true) 
| | |-- p3_detail: string (nullable = true) 

私の問題は、オブジェクトが構造体として読まれていることです。フィールドを管理して簡単に更新するために、私はMapを持っていますが、StructTypeにはあまり慣れていません。

私は地図としてUDFにオブジェクトを取得しようとしましたが、私は次のようなエラーがあります。私の質問

val getSubField : Map[String, Map[String, String]] => String = (params : Map[String, Map[String, String]]) => { val return_string = (params ("p1") getOrElse("p1_detail", null.asInstanceOf[String]) return_string } 

:どのように私たちは、この構造体に変換することができますが

User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(params)' due to data type mismatch: argument 1 requires map<string,map<string,string>> type, however, 'params' is of struct<p1:struct<p1_detail:string>,p2:struct<p2_detail:string,p2_filter:string>,p3:struct<p3_detail:string>> type.; 

UDFのコードスニペットをマップに?私はすでにドキュメントで利用可能なtoMapメソッドを見たことがありますが、私はスカラの初心者なので、それを使用する方法(暗黙的なパラメータにあまり慣れていない)を見つけることができません。事前に

おかげで、

+0

を使用すると、UDFのコードスニペットを追加してくださいすることができますか? –

+0

Structが期待されるMap [String、Map [String、String]]を取得しようとしているので、UDFはあまり役に立たないでしょう。 (パラメータ: "p1"): –

+0

'val getSubField:Map [String、String [String、String]] => String =(params:Map [String、String])=> { \t val return_string = )getOrElse(「p1_detail」、null.asInstanceOf [文字列]) \t私はSructTypeで「のparams」タイプを置き換えることができますが、その後、私はちょうど地図に変換する方法がわからない } ' –

答えて

0

代わりに行としてタイプを指定し、StructTypeオブジェクトとしてPARAMの種類を指定することはできません。

//Schema of parameter 
def schema:StructType = (new StructType).add("p1", (new StructType).add("p1_detail", StringType)) 
     .add("p2", (new StructType).add("p2_detail", StringType).add("p2_filter",StringType)) 
     .add("p3", (new StructType).add("p3_detail", StringType)) 

//Not allowed 
val extractVal: schema => collection.Map[Nothing, Nothing] = _.getMap(0) 

ソリューション:

// UDF example to process struct column 
val extractVal: (Row) => collection.Map[Nothing, Nothing] = _.getMap(0) 

// You would implement something similar 
    val getSubField : Map[String, Map[String, String]] => String = 
    (params : Row) => 
    { 
    val p1 = params.getAs[Row]("p1") 
    ......... 
    return null; 
    } 

私はこれが役に立てば幸い!

+0

私はそれを試みます。 –

0

次のように私は最終的にそれを解決:

def convertRowToMap[T](row : Row) : Map[String, T] = { 
    row.schema.fieldNames.filter(field => !row.isNullAt(row.fieldIndex(field))).map(field => field -> row.getAs[T](field)).toMap 
} 

/* udf that converts Row to Map */ 
    val rowToMap : Row => Map[String, Map[String, String]] = (row:Row) => { 
    val map_temp = convertRowToMap[Row](row) 

    val map_to_return = map_temp.map{case(k,v) => k -> convertRowToMap[String](v)} 

    map_to_return 
} 
    val udfrowToMap = udf(rowToMap) 
関連する問題