2017-01-05 14 views
3

地図に行を変換DATAFRAMEスカラ - スパークDATAFRAMEは - 私はスパークを持っている変数

私は[[INT、文字列]文字列、地図]

地図などの地図の変数にこれを変換する必要が

 
Level Hierarchy Code 
-------------------------- 
Level1 Hier1  1 
Level1 Hier2  2 
Level1 Hier3  3 
Level1 Hier4  4 
Level1 Hier5  5 
Level2 Hier1  1 
Level2 Hier2  2 
Level2 Hier3  3 

すなわち

Map["Level1", Map[1->"Hier1", 2->"Hier2", 3->"Hier3", 4->"Hier4", 5->"Hier5"]] 
Map["Level2", Map[1->"Hier1", 2->"Hier2", 3->"Hier3"]] 

この機能を実現するには、適切な方法を提案してください。

私の試み。それは動作しますが、あなたがmapGroupsdataframe.groupByKey("level") followeedを使用する必要が醜い

val level_code_df =master_df.select("Level","Hierarchy","Code").distinct() 
val hierarchy_names = level_code_df.select("Level").distinct().collect() 
val hierarchy_size = hierarchy_names.size 
var hierarchyMap : scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]] = scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]]()  
for(i <- 0 to hierarchy_size.toInt-1)  
println("names:"+hierarchy_names(i)(0)) 
val name = hierarchy_names(i)(0).toString() 
val code_level_map = level_code_df.rdd.map{row => { 
if(name.equals(row.getAs[String]("Level"))){ 
Map(row.getAs[String]("Code").toInt -> row.getAs[String]("Hierarchy")) 
} else 
Map[Int, String]() 
    }}.reduce(_++_) 

    hierarchyMap = hierarchyMap + (name -> (collection.mutable.Map() ++ code_level_map))  
    }   

    }  
+1

こんにちは、自分のコードを投稿に追加しました。 –

答えて

3

。 kryoマップ符号化を含めることも忘れないでください:

case class Data(level: String, hierarhy: String, code: Int) 
val data = Seq(
Data("Level1","Hier1",1), 
Data("Level1","Hier2",2), 
Data("Level1","Hier3",3), 
Data("Level1","Hier4",4), 
Data("Level1","Hier5",5), 
Data("Level2","Hier1",1), 
Data("Level2","Hier2",2), 
Data("Level2","Hier3",3)).toDS 
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Map[Int, String]]] 

は、Spark 2.0+:

data.groupByKey(_.level).mapGroups{ 
    case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap) 
}.collect() 
//Array[Map[String,Map[Int,String]]] = Array(Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4)), Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3))) 

がスパーク1.6+:

data.rdd.groupBy(_.level).map{ 
    case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap) 
}.collect() 
//Array[Map[String,Map[Int,String]]] = Array(Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3)), Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4))) 
+0

お返事ありがとうございます。あなたのコードがSpark 2.0+に対応することを願っています。このコードはSpark 1.6で動作しますか? –

+0

spark-1.6の答えが更新されました。ほぼ同じ、ちょうどrddに変換する必要があります – prudenko

0

prudenkoの答えは、おそらく最も簡潔で@ - とSpark 1.6以降で動作するはずです。しかし - あなたがデータフレーム API(とないデータセット)にとどまる解決策を探しているなら、ここでは簡単なUDF使って一つだ:これはひどく実行(またはOOMになること

val mapCombiner = udf[Map[Int, String], mutable.WrappedArray[Map[Int, String]]] {_.reduce(_ ++ _)} 

val result: Map[String, Map[Int, String]] = df 
    .groupBy("Level") 
    .agg(collect_list(map($"Code", $"Hierarchy")) as "Maps") 
    .select($"Level", mapCombiner($"Maps") as "Combined") 
    .rdd.map(r => (r.getAs[String]("Level"), r.getAs[Map[Int, String]]("Combined"))) 
    .collectAsMap() 

NOTICEは、 1つのキー(値Level)には数千もの異なる値があるかもしれませんが、これをすべてドライバのメモリに集めているので、これは問題ではないでしょうし、あなたの要件は関係なく動作しません。

関連する問題