2017-08-17 10 views
1

私はSparkRデータフレームを持っていたグループの最後の値を取得する:スパーク

#Create R data.frame 
custId <- c(rep(1001, 5), rep(1002, 3), 1003) 
date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01') 
desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New') 
newcust <- c(1,1,0,1,0,1,0,0,1) 
df <- data.frame(custId, date, desc, newcust) 

#Create SparkR DataFrame  
df <- createDataFrame(df) 
display(df) 
     custId| date | desc | newcust 
     -------------------------------------- 
     1001 | 2013-08-01| New | 1 
     1001 | 2014-01-01| New | 1 
     1001 | 2014-02-01| Good | 0 
     1001 | 2014-03-01| New | 1 
     1001 | 2014-04-01| Bad | 0 
     1002 | 2014-02-01| New | 1 
     1002 | 2014-03-01| Good | 0 
     1002 | 2014-04-01| Good | 0 
     1003 | 2014-04-01| New | 1 

newcustは新しい顧客に新しいcustIdが表示されるすべての時間を示し、または同じcustIddesc戻しますと「新しい場合'私が得たいのは、各グループごとに最初のdateを維持しながら、newcustの各グループの最後のdescの値です。以下は取得したいDataFrameです。スパークでどうすればいいですか? PySparkまたはSparkRのいずれかのコードが動作します。

#What I want 
custId| date | newcust | finaldesc 
---------------------------------------------- 
1001 | 2013-08-01| 1  | New 
1001 | 2014-01-01| 1  | Good 
1001 | 2014-03-01| 1  | Bad 
1002 | 2014-02-01| 1  | Good 
1003 | 2014-04-01| 1  | New 

答えて

1

私はsparkRについて知らないので、私はpysparkで答えます。 これはウィンドウ関数を使用して実現できます。

まず、あなたは、累積合計がトリックを行います計算、newcustは、新しいグループの始まりであることを1に等しいすべての行をしたい、のは「newcustのグループ化」を定義してみましょう:

from pyspark.sql import Window 
import pyspark.sql.functions as psf 

w1 = Window.partitionBy("custId").orderBy("date") 
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1)) 

+------+----------+----+-------+--------+ 
|custId|  date|desc|newcust|subgroup| 
+------+----------+----+-------+--------+ 
| 1001|2013-08-01| New|  1|  1| 
| 1001|2014-01-01| New|  1|  2| 
| 1001|2014-02-01|Good|  0|  2| 
| 1001|2014-03-01| New|  1|  3| 
| 1001|2014-04-01| Bad|  0|  3| 
| 1002|2014-02-01| New|  1|  1| 
| 1002|2014-03-01|Good|  0|  1| 
| 1002|2014-04-01|Good|  0|  1| 
| 1003|2014-04-01| New|  1|  1| 
+------+----------+----+-------+--------+ 

w2 = Window.partitionBy("custId", "subgroup") 
df2 = df1.withColumn("first_date", psf.min("date").over(w2)) 

+------+----------+----+-------+--------+----------+ 
|custId|  date|desc|newcust|subgroup|first_date| 
+------+----------+----+-------+--------+----------+ 
| 1001|2013-08-01| New|  1|  1|2013-08-01| 
| 1001|2014-01-01| New|  1|  2|2014-01-01| 
| 1001|2014-02-01|Good|  0|  2|2014-01-01| 
| 1001|2014-03-01| New|  1|  3|2014-03-01| 
| 1001|2014-04-01| Bad|  0|  3|2014-03-01| 
| 1002|2014-02-01| New|  1|  1|2014-02-01| 
| 1002|2014-03-01|Good|  0|  1|2014-02-01| 
| 1002|2014-04-01|Good|  0|  1|2014-02-01| 
| 1003|2014-04-01| New|  1|  1|2014-04-01| 
+------+----------+----+-------+--------+----------+ 

最後に、我々は最後の行を維持したい(日付順)すべてのの各subgroupについては、我々は最初の日付を維持したいです:ここでは

w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date")) 
df3 = df2.withColumn(
    "rn", 
    psf.row_number().over(w3) 
).filter("rn = 1").select(
    "custId", 
    psf.col("first_date").alias("date"), 
    "desc" 
) 

+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+ 
0

はSparkRでマリーのコード@です:

w1 <- orderBy(windowPartitionBy('custId'), df$date) 
df1 <- withColumn(df, "subgroup", over(sum(df$newcust), w1)) 

w2 <- windowPartitionBy("custId", "subgroup") 
df2 <- withColumn(df1, "first_date", over(min(df1$date), w2)) 

w3 <- orderBy(windowPartitionBy("custId", "subgroup"), desc(df$date)) 
df3 <- withColumn(df2, "rn", over(row_number(), w3)) 
df3 <- select(filter(df3, df3$rn == 1), "custId", "first_date", "desc") 
df3 <- withColumnRenamed(df3, 'first_date', "date") 

df3 <- arrange(df3, 'custId', 'date') 
display(df3) 
+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+