ウィンドウ機能とpivot
を組み合わせることができます。必要な輸入:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, concat, desc, first, lit, row_number
データ:
df = sc.parallelize([
(1, 14, "A", 1483695557), (2, 14, "B", 1486395557),
(3, 14, "C", 1488814757), (4, 14, "A", 1491489557),
(5, 14, "D", 1478446757), (6, 19, "X", 1483695557),
(7, 19, "Y", 1486395557), (8, 19, "Z", 1478446757),
(9, 19, "W", 1491489557), (10, 21, "R", 1468446757),
]).toDF(["id", "client_num", "value", "date"])
ウィンドウの定義(2.1 +構文スパーク):
w = Window.partitionBy("client_num").orderBy(desc("date"))
まず者は3つの最新のレコードをフィルタリングしてみましょう:
latest = df.withColumn("rn", row_number().over(w)).where(col("rn") <= 3)
接頭辞とを追加する:以下に示すように結果に
(latest
.withColumn("rn", concat(lit("value_"), col("rn")))
.groupBy("client_num")
.pivot("rn", ["value_{}".format(i) for i in [1, 2, 3]])
.agg(first("value")))
:
+----------+-------+-------+-------+
|client_num|value_1|value_2|value_3|
+----------+-------+-------+-------+
| 19| W| Y| X|
| 14| A| C| B|
| 21| R| null| null|
+----------+-------+-------+-------+
あなたは、ウィンドウのorderBy
句を変更することで、順序を調整することができます。
あなたがrow_number
を追加する前にデータを集約することができますユニークな値たい場合:
from pyspark.sql.functions import max
latest = (df
.groupBy("client_num", "value").agg(max("date").alias("date"))
.withColumn("rn", row_number().over(w)).where(col("rn") <= 3))
おかげで、それは動作しますが、私はまだいくつかの行に重複した値を取得します。各値は一意でなければなりません。 – Omar14