2016-10-27 5 views
3

spark scala DataFrameには、(id、day、val、order)の4つの値があります。 (id、day、value_list:List(val1、val2、...、valn))ここで、val1からvalnは昇順の値で並べ替えられている新しいDataFrameを作成します。例えばSpark Scala:DataFrame列の値を順序リストに集約する

(50, 113, 1, 1), 
(50, 113, 1, 3), 
(50, 113, 2, 2), 
(51, 114, 1, 2), 
(51, 114, 2, 1), 
(51, 113, 1, 1) 

はなる:

((51,113),List(1)) 
((51,114),List(2, 1) 
((50,113),List(1, 2, 1)) 

私は近いんだけど、私は、リストにデータを集計した後に何をすべきか分かりません。出力は次のようになり

import org.apache.spark.sql.Row 

val testList = List((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1)) 
val testDF = sqlContext.sparkContext.parallelize(testList).toDF("id1", "id2", "val", "order") 

val rDD1 = testDF.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int) => ((key1, key2), List((val1, val2)))} 
val rDD2 = rDD1.reduceByKey{case (x, y) => x ++ y} 

((51,113),List((1,1))) 
((51,114),List((1,2), (2,1))) 
((50,113),List((1,3), (1,1), (2,2))) 

は、次のステップを生成するために次のようになります。

私はその後、注文int型によってそれぞれの値のリストスパーク順序を持っているかどうかはわかりません
((51,113),List((1,1))) 
((51,114),List((2,1), (1,2))) 
((50,113),List((1,1), (2,2), (1,3))) 

答えて

3

あなたは自分のRDDの上にマッピングして使用する必要がありますsortBy

scala> val df = Seq((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1)).toDF("id1", "id2", "val", "order") 
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int, val: int, order: int] 

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> val rDD1 = df.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int) => ((key1, key2), List((val1, val2)))} 
rDD1: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[10] at map at <console>:28 

scala> val rDD2 = rDD1.reduceByKey{case (x, y) => x ++ y} 
rDD2: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = ShuffledRDD[11] at reduceByKey at <console>:30 

scala> val rDD3 = rDD2.map(x => (x._1, x._2.sortBy(_._2))) 
rDD3: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[12] at map at <console>:32 

scala> rDD3.collect.foreach(println) 
((51,113),List((1,1))) 
((50,113),List((1,1), (2,2), (1,3))) 
((51,114),List((2,1), (1,2))) 
1
testDF.groupBy("id1","id2").agg(collect_list($"val")).show 
+---+---+-----------------+              
|id1|id2|collect_list(val)| 
+---+---+-----------------+ 
| 51|113|    [1]| 
| 51|114|   [1, 2]| 
| 50|113|  [1, 1, 2]| 
+---+---+-----------------+ 
関連する問題