2017-07-13 6 views
0

私は以下のように顧客のSpark DataFrameを持っています。顧客が「新しい」のvalueを取得custIDための最初の月の観測でSparkで元に戻す値の特定

#SparkR code 
customers <- data.frame(custID = c("001", "001", "001", "002", "002", "002", "002"), 
    date = c("2017-02-01", "2017-03-01", "2017-04-01", "2017-01-01", "2017-02-01", "2017-03-01", "2017-04-01"), 
    value = c('new', 'good', 'good', 'new', 'good', 'new', 'bad')) 
customers <- createDataFrame(customers) 
display(customers) 

custID| date  | value 
-------------------------- 
001 | 2017-02-01| new 
001 | 2017-03-01| good 
001 | 2017-04-01| good 
002 | 2017-01-01| new 
002 | 2017-02-01| good 
002 | 2017-03-01| new 
002 | 2017-04-01| bad 

。その後、それらは「良い」または「悪い」として分類される。しかし、顧客が第2の口座を開設した場合、「良い」または「悪い」から「新規」に戻すことは可能です。これが起こったとき、私は、顧客が次のように2番目の口座を開設したことを示すために、 '1'ではなく '2'でタグ付けします。スパークでどうすればいいですか? SparkRまたはPySparkコマンドが機能します。 pysparkで

#What I want to get 
custID| date  | value | tag 
-------------------------------- 
001 | 2017-02-01| new | 1 
001 | 2017-03-01| good | 1 
001 | 2017-04-01| good | 1 
002 | 2017-01-01| new | 1 
002 | 2017-02-01| good | 1 
002 | 2017-03-01| new | 2 
002 | 2017-04-01| bad | 2 

答えて

0

from pyspark.sql import functions as f 

spark = SparkSession.builder.getOrCreate() 

# df is equal to your customers dataframe 
df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', header=True, sep='|').cache() 

df_new = df.filter(df['value'] == 'new').withColumn('tag', f.rank().over(Window.partitionBy('custID').orderBy('date'))) 
df = df_new.union(df.filter(df['value'] != 'new').withColumn('tag', f.lit(None))) 
df = df.withColumn('tag', f.collect_list('tag').over(Window.partitionBy('custID').orderBy('date'))) \ 
    .withColumn('tag', f.UserDefinedFunction(lambda x: x.pop(), IntegerType())('tag')) 

df.show() 

そして出力:ところで

+------+----------+-----+---+             
|custID|  date|value|tag| 
+------+----------+-----+---+ 
| 001|2017-02-01| new| 1| 
| 001|2017-03-01| good| 1| 
| 001|2017-04-01| good| 1| 
| 002|2017-01-01| new| 1| 
| 002|2017-02-01| good| 1| 
| 002|2017-03-01| new| 2| 
| 002|2017-04-01| bad| 2| 
+------+----------+-----+---+ 

パンダ簡単にそれを行うことができます。

+0

を持つすべてのレコードをフィルタリングおかげで、私はRまたはパンダでこれを行うことができますが、私は持っていますSparkが必要な非常に大きなデータフレーム。 –

0

これは、次のコードを使用して行うことができます。

「新しい」

df_new<-sql("select * from df where value="new") 
createOrReplaceTempView(df_new,"df_new") 

df_new<-sql("select *,row_number() over(partiting by custID order by date) 
tag from df_new") 
createOrReplaceTempView(df_new,"df_new") 

df<-sql("select custID,date,value,min(tag) as tag from 
(select t1.*,t2.tag from df t1 left outer join df_new t2 on 
t1.custID=t2.custID and t1.date>=t2.date) group by 1,2,3") 
関連する問題