2017-10-19 3 views
1

に、任意のN列を変換する - そのような何か:スパークスカラ:私は残りの列にその映画のためのさまざまなユーザーのための映画のID(最初のカラム)および評価を表す次のようなデータ構造を持っている地図

+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ 
|movieId| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ 
| 1580|null|null| 3.5| 5.0|null|null|null|null|null|null|null|null|null|null|null| 
| 3175|null|null|null|null|null|null|null|null|null|null|null|null|null| 5.0|null| 
| 3794|null|null|null|null|null|null|null|null|null|null|null| 3.0|null|null|null| 
| 2659|null|null|null| 3.0|null|null|null|null|null|null|null|null|null|null|null| 

私はそれが

のようなものになるように

final case class MovieRatings(movie_id: Long, ratings: Map[Long, Double])

のDataSetにこのデータフレームを変換したいです3210

[1580, [1 -> null, 2 -> null, 3 -> 3.5, 4 -> 5.0, 5 -> null, 6 -> null, 7 -> null,...]] 

これを行うことができますか?

ここのことは、ユーザー数は任意であるということです。そして、それらを単一の列に圧縮して、最初の列は元のままにします。

+2

の可能性のある複製を[2.0スパーク - DataSetにデータフレームを変換](https://stackoverflow.com/questions/40700213/spark-2 -0-convert-dataframe-to-dataset) – Pavel

+0

私はこれが重複しているとは思わないこの質問はどのように私はそれを行うのですか、そしてその質問は私がこれをしようとしていて、 Sparkをアップグレードする必要があります。この質問はチュートリアルを求めているので、話題にはなりません。 – jmarkmurphy

答えて

3

まず、あなたは、あなたがDataset[MovieRatings]にデータフレームを変換する.as[MovieRatings]を使用することができ、あなたのケースのクラスに一致するスキーマで一つにあなたのデータフレームをtranformする必要があります。

import org.apache.spark.sql.functions._ 
import spark.implicits._ 

// define a new MapType column using `functions.map`, passing a flattened-list of 
// column name (as a Long column) and column value 
val mapColumn: Column = map(df.columns.tail.flatMap(name => Seq(lit(name.toLong), $"$name")): _*) 

// select movie id and map column with names matching the case class, and convert to Dataset: 
df.select($"movieId" as "movie_id", mapColumn as "ratings") 
    .as[MovieRatings] 
    .show(false) 
1

あなたはspark.sql.functionsを使用することができます任意の列からマップを作成する.map。これは、キーと値の間でシーケンスが交互になることを期待しています。これは、Column型またはString型になります。ここでの例である:

import spark.implicits._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions 

case class Input(movieId: Int, a: Option[Double], b: Option[Double], c: Option[Double]) 

val data = Input(1, None, Option(3.5), Option(1.4)) :: 
     Input(2, Option(4.2), Option(1.34), None) :: 
     Input(3, Option(1.11), None, Option(3.32)) :: Nil 

val df = sc.parallelize(data).toDF 

// Exclude the PK column from the map 
val mapKeys = df.columns.filterNot(_ == "movieId") 

// Build the sequence of key, value, key, value, .. 
val pairs = mapKeys.map(k => Seq(lit(k), col(k))).flatten 

val mapped = df.select($"movieId", functions.map(pairs:_*) as "map") 
mapped.show(false) 

この出力生成し:

+-------+------------------------------------+ 
|movieId|map         | 
+-------+------------------------------------+ 
|1  |Map(a -> null, b -> 3.5, c -> 1.4) | 
|2  |Map(a -> 4.2, b -> 1.34, c -> null) | 
|3  |Map(a -> 1.11, b -> null, c -> 3.32)| 
+-------+------------------------------------+ 
関連する問題