あなたはモジュールで、スパークSQLモジュールであり、いくつか列挙した機能をウィンドウ関数を使用することができhelp.``ます。
まずあなたがデータフレームにあなたのRDDを変換する必要があります:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
schema = StructType([
StructField("date", StringType()),
StructField("col", StructType([
StructField("customer_id", IntegerType()),
StructField("subtotal", DoubleType())
]))])
my_rdd = sc.parallelize([('2013-07', (5293, 2781.73)),('2013-07', (4257, 2059.75)),('2013-07', (32, 2009.75)),
('2013-07', (5182, 1949.8200000000002)),('2013-07', (1478, 1784.7600000000002)),
('2013-07', (1175, 1699.91)),('2013-07', (9807, 1664.9)),('2013-07', (1780, 1651.8500000000001)),
('2013-07', (11941, 1649.8000000000002)),('2013-07', (2071, 1629.8400000000001))])
df = my_rdd.toDF(schema)
df.printSchema()
root
|-- date: string (nullable = true)
|-- col: struct (nullable = true)
| |-- customer_id: integer (nullable = true)
| |-- subtotal: double (nullable = true)
今、私たちは私たちの窓枠を定義し、窓関数を適用することができます。
import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("date").orderBy(psf.desc("col.subtotal"))
df = df.withColumn("rn", psf.row_number().over(w))
df.show()
+-------+--------------------+---+
| date| col| rn|
+-------+--------------------+---+
|2013-07| [5293,2781.73]| 1|
|2013-07| [4257,2059.75]| 2|
|2013-07| [32,2009.75]| 3|
|2013-07|[5182,1949.820000...| 4|
|2013-07|[1478,1784.760000...| 5|
|2013-07| [1175,1699.91]| 6|
|2013-07| [9807,1664.9]| 7|
|2013-07|[1780,1651.850000...| 8|
|2013-07|[11941,1649.80000...| 9|
|2013-07|[2071,1629.840000...| 10|
+-------+--------------------+---+
そして最後に、あなたはフィルタリングすることができます各日付のトップ5のお客様:
df.filter("rn >= 5")