カップル。
SQL
df.createOrReplaceTempView("tbl")
val q = """
select keycol,
c[0] processcode1,
c[1] processcode2,
c[2] processcode3
from (select keycol, collect_list(processcode) c
from tbl
group by keycol) t0
"""
sql(q).show
結果
scala> sql(q).show
+------+------------+------------+------------+
|keycol|processcode1|processcode2|processcode3|
+------+------------+------------+------------+
| Mary| 1| 7| 8|
| John| 4| 1| 2|
+------+------------+------------+------------+
PairRDDFunctions(groupByKey)+ mapPartitions
import org.apache.spark.sql.Row
val my_rdd = df.map{ case Row(a1: String, a2: Int) => (a1, a2)
}.rdd.groupByKey().map(t => (t._1, t._2.toList))
def f(iter: Iterator[(String, List[Int])]) : Iterator[Row] = {
var res = List[Row]();
while (iter.hasNext) {
val (keycol: String, c: List[Int]) = iter.next
res = res ::: List(Row(keycol, c(0), c(1), c(2)))
}
res.iterator
}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
val schema = new StructType().add(
StructField("Keycol", StringType, true)).add(
StructField("processcode1", IntegerType, true)).add(
StructField("processcode2", IntegerType, true)).add(
StructField("processcode3", IntegerType, true))
spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show
結果
scala> spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show
+------+------------+------------+------------+
|Keycol|processcode1|processcode2|processcode3|
+------+------------+------------+------------+
| Mary| 1| 7| 8|
| John| 4| 1| 2|
+------+------------+------------+------------+
明示的に指定されていない限り、すべての場合において、プロセスコードの列の値の順番はであることを覚えておいてください。
ありがとうレオ私の問題解決df2 = df.groupBy( "Keycol").gg(collect_list( "processcode")。as( "processcode"))あなたのクイックヘルプをありがとう。 –
助けてくれてうれしいです。答えを受け入れることで問題を解決できますか? –