2017-10-17 6 views
0

spark/Scala APIを使用してレコードをフラット化するロジックを実装しようとしています。マップ機能を使用しようとしています。キー列を使用してレコードをフラット化する

この問題を解決する最も簡単な方法を教えてください。 =================>

Keycol|processcode 
John |1 
Mary |8 
John |2 
John |4 
Mary |1 
Mary |7 

-

は、与えられたキーのために、私は3つのプロセスコード

入力データフレームを持っている必要があり、想定します=============

出力データフレーム - >

Keycol|processcode1|processcode2|processcode3 
john |1   |2   |4 
Mary |8   |1   |7 

答えて

1

Keycolごとの行数が同じと仮定すると、1つのアプローチは、トンになります各Keycolの配列にprocesscode凝集Oおよび個々の列に出て展開:代替アプローチの

val df = Seq(
    ("John", 1), 
    ("Mary", 8), 
    ("John", 2), 
    ("John", 4), 
    ("Mary", 1), 
    ("Mary", 7) 
).toDF("Keycol", "processcode") 

val df2 = df.groupBy("Keycol").agg(collect_list("processcode").as("processcode")) 

val numCols = df2.select(size(col("processcode"))).as[Int].first 
val cols = (0 to numCols - 1).map(i => col("processcode")(i)) 

df2.select(col("Keycol") +: cols: _*).show 

+------+--------------+--------------+--------------+ 
|Keycol|processcode[0]|processcode[1]|processcode[2]| 
+------+--------------+--------------+--------------+ 
| Mary|    8|    1|    7| 
| John|    1|    2|    4| 
+------+--------------+--------------+--------------+ 
+0

ありがとうレオ私の問題解決df2 = df.groupBy( "Keycol").gg(collect_list( "processcode")。as( "processcode"))あなたのクイックヘルプをありがとう。 –

+1

助けてくれてうれしいです。答えを受け入れることで問題を解決できますか? –

1

カップル。

SQL

df.createOrReplaceTempView("tbl") 

val q = """ 
select keycol, 
     c[0] processcode1, 
     c[1] processcode2, 
     c[2] processcode3 
    from (select keycol, collect_list(processcode) c 
      from tbl 
     group by keycol) t0 
""" 

sql(q).show 

結果

scala> sql(q).show 
+------+------------+------------+------------+ 
|keycol|processcode1|processcode2|processcode3| 
+------+------------+------------+------------+ 
| Mary|   1|   7|   8| 
| John|   4|   1|   2| 
+------+------------+------------+------------+ 

PairRDDFunctions(groupByKey)+ mapPartitions

import org.apache.spark.sql.Row 
val my_rdd = df.map{ case Row(a1: String, a2: Int) => (a1, a2) 
        }.rdd.groupByKey().map(t => (t._1, t._2.toList)) 

def f(iter: Iterator[(String, List[Int])]) : Iterator[Row] = { 
    var res = List[Row](); 
    while (iter.hasNext) { 
    val (keycol: String, c: List[Int]) = iter.next  
    res = res ::: List(Row(keycol, c(0), c(1), c(2))) 
    } 
    res.iterator 
} 

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} 
val schema = new StructType().add(
      StructField("Keycol", StringType, true)).add(
      StructField("processcode1", IntegerType, true)).add(
      StructField("processcode2", IntegerType, true)).add(
      StructField("processcode3", IntegerType, true)) 

spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show 

結果

scala> spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show 
+------+------------+------------+------------+ 
|Keycol|processcode1|processcode2|processcode3| 
+------+------------+------------+------------+ 
| Mary|   1|   7|   8| 
| John|   4|   1|   2| 
+------+------------+------------+------------+ 

明示的に指定されていない限り、すべての場合において、プロセスコードの列の値の順番はであることを覚えておいてください。

関連する問題