2016-11-28 10 views
0

からWrappedarrayをACCESするためにどのように私はこのようなデータフレームがあります。は、データフレームの地図

val x = sc.parallelize(Seq(
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "1", "o"), ("a", "1", "n"), ("b", "4", "xxx")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "1", "o"), ("a", "1", "n"), ("b", "4", "n")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "4", "o"), ("a", "1", "n"), ("b", "3", "n")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "4", "o"), ("a", "1", "n"), ("b", "3", "n")) 
    )).map(x => testSchema(x)).toDF("myArrays") 


val y = x.withColumn("myKeys", lit("b")) 

val getMap = udf((mouvements: mutable.WrappedArray[Row]) => { 
    val test = mouvements.toArray 
    .map(line => (line(0).toString, line(1).toString, line(2).toString)) 
    .groupBy(_._1) 
    .map{case (k,values) => k -> values.map(x => (x._2, x._3))} 
    test}) 


val df_with_map = y.select($"myKeys", getMap($"myArrays") as "myMaps") 
df_with_map show false 
df_with_map printSchema 

:このスキーマここ

root 
|-- myKeys: string (nullable = false) 
|-- myMaps: map (nullable = true) 
| |-- key: string 
| |-- value: array (valueContainsNull = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- _1: string (nullable = true) 
| | | |-- _2: string (nullable = true) 

+------+------------------------------------------------------------------------------+ 
|myKeys|myMaps                  | 
+------+------------------------------------------------------------------------------+ 
|b  |Map(b -> WrappedArray([1,o], [4,xxx]), a -> WrappedArray([1,o], [1,n], [1,n]))| 
|a  |Map(b -> WrappedArray([1,o], [4,n]), a -> WrappedArray([4,c], [1,n], [1,n])) | 
|a  |Map(b -> WrappedArray([4,o], [3,n]), a -> WrappedArray([4,o], [1,n], [1,n])) | 
|b  |Map(b -> WrappedArray([4,a], [3,n]), a -> WrappedArray([1,o], [1,n], [1,n])) | 
+------+------------------------------------------------------------------------------+ 

は、それを作成するためのコードです今、最初の要素が4に等しい配列の2番目の要素とmap equlasのキーをbにアクセスしたいと思います。ライン

val getMyValue = udf{(myKey: String, myMaps: Map[String, WrappedArray[Row]]) => 

    val first_val= "4" 
    val myArrays = myMaps.get(myKey) 
    val res = myArrays.get.toArray.filter{x => x.getString(0) == first_val} 
    res 
} 

val df_value = df_with_map.select(getMyValue($"myKey",$"myMaps") as "myValue") 
df_value show false 
df_value printSchema 

しかし、それは

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported 

つのエラーを返す:

を私はすでに、これがこのUDFでそれをやってみてください。この

+---+ 
|val| 
+---+ 
|xxx| 
|c | 
|o | 
|a | 
+---+ 

のような結果を持っている必要があります

val getMyValue = udf{(myKey: String, myMaps: Map[String, WrappedArray[Row]]) => 

ご存知ですか?

答えて

4

用途:

val first_val = "4" 
val df = Seq(
    ("b", Map("b" -> Seq(("1", "o"), ("4", "xxx")))) 
).toDF("myKeys", "myMaps") 

root 
|-- myKeys: string (nullable = true) 
|-- myMaps: map (nullable = true) 
| |-- key: string 
| |-- value: array (valueContainsNull = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- _1: string (nullable = true) 
| | | |-- _2: string (nullable = true) 
df.select($"myMaps".getItem("b")) 
    .as[Seq[(String, String)]] 
    .flatMap(xs => xs.filter(_._1 == first_val).map(_._2)) 

編集

df.as[(String, Map[String,Seq[(String, String)]])].flatMap { 
    case (key, map) => 
    map.getOrElse(key, Seq[(String, String)]()).filter(_._1 == first_val).map(_._2) 
} 
+0

ニースこの作品が、私は私の例があまりにも簡単だったと思います。私が持っている問題は、getItemメソッドを呼び出すことができないように、私が値を取得したいキーが選択前に分からないということです。私はそれをより包括的にするために質問を再提出するでしょう –

+0

私の問題のより良い説明を与えるために私のポストを更新します –

+0

私のDataFrameであなたのメソッドを呼び出すとき返す: org.apache.spark.sql.AnalysisException: StringType、ArrayType(StructType(StructField(_1、StringType、true));キャストできません(データ型の不一致により、キャストされた文字列型(Map >>) 、StructField(_2、StringType、true))、true)、true); それはあなたの例では動作しますが、私のためには動作しません。 –