2017-11-16 8 views
-1

50kに近い数のソートされたRDDは、以下のとおり、 rdd:(date、(customer_id、subtotal))の形式のデータです。 core Sparkを使用して、上記のRDDで各日付のトップ5顧客を抽出するにはどうすればよいですか?コアスパークにランクを実装する上 の任意のポインタもPYSPARK-コアスパークAPIを使用して毎月トップnレコードを見つけよう

for i in orderItemsJoinSortMap.take(10): print(i)

('2013-07', (5293, 2781.73)) 
('2013-07', (4257, 2059.75)) 
('2013-07', (32, 2009.75)) 
('2013-07', (5182, 1949.8200000000002)) 
('2013-07', (1478, 1784.7600000000002)) 
('2013-07', (1175, 1699.91)) 
('2013-07', (9807, 1664.9)) 
('2013-07', (1780, 1651.8500000000001)) 
('2013-07', (11941, 1649.8000000000002)) 
('2013-07', (2071, 1629.8400000000001)) 

答えて

0

あなたはモジュールで、スパークSQLモジュールであり、いくつか列挙した機能をウィンドウ関数を使用することができhelp.``ます。

まずあなたがデータフレームにあなたのRDDを変換する必要があります:

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType 
schema = StructType([ 
    StructField("date", StringType()), 
    StructField("col", StructType([ 
     StructField("customer_id", IntegerType()), 
     StructField("subtotal", DoubleType()) 
    ]))]) 
my_rdd = sc.parallelize([('2013-07', (5293, 2781.73)),('2013-07', (4257, 2059.75)),('2013-07', (32, 2009.75)), 
         ('2013-07', (5182, 1949.8200000000002)),('2013-07', (1478, 1784.7600000000002)), 
         ('2013-07', (1175, 1699.91)),('2013-07', (9807, 1664.9)),('2013-07', (1780, 1651.8500000000001)), 
         ('2013-07', (11941, 1649.8000000000002)),('2013-07', (2071, 1629.8400000000001))]) 
df = my_rdd.toDF(schema) 
df.printSchema() 

    root 
    |-- date: string (nullable = true) 
    |-- col: struct (nullable = true) 
    | |-- customer_id: integer (nullable = true) 
    | |-- subtotal: double (nullable = true) 

今、私たちは私たちの窓枠を定義し、窓関数を適用することができます。

import pyspark.sql.functions as psf 
from pyspark.sql import Window 
w = Window.partitionBy("date").orderBy(psf.desc("col.subtotal")) 
df = df.withColumn("rn", psf.row_number().over(w)) 
df.show() 

    +-------+--------------------+---+ 
    | date|     col| rn| 
    +-------+--------------------+---+ 
    |2013-07|  [5293,2781.73]| 1| 
    |2013-07|  [4257,2059.75]| 2| 
    |2013-07|  [32,2009.75]| 3| 
    |2013-07|[5182,1949.820000...| 4| 
    |2013-07|[1478,1784.760000...| 5| 
    |2013-07|  [1175,1699.91]| 6| 
    |2013-07|  [9807,1664.9]| 7| 
    |2013-07|[1780,1651.850000...| 8| 
    |2013-07|[11941,1649.80000...| 9| 
    |2013-07|[2071,1629.840000...| 10| 
    +-------+--------------------+---+ 

そして最後に、あなたはフィルタリングすることができます各日付のトップ5のお客様:

df.filter("rn >= 5")