2017-12-19 9 views
0

pysparkにbussiness current viewと同等のものを作成する必要があります。履歴ファイルとデルタファイル(IDと日付を含む)があります。最終的なデータフレームを作成する必要があります。各idの単一レコードがあり、そのレコードは最新の日付でなければなりません。Pyspark - pysparkで現在のビジネスビューを作成する

df1=sql_context.createDataFrame([("3000", "2017-04-19"), ("5000", "2017-04-19"), ("9012", "2017-04-19")], ["id", "date"]) 
df2=sql_context.createDataFrame([("3000", "2017-04-18"), ("5120", "2017-04-18"), ("1012", "2017-04-18")], ["id", "date"]) 

DF3 = df2.union(DF1).distinct()

+----+----------+ 
| id|  date| 
+----+----------+ 
|3000|2017-04-19| 
|3000|2017-04-18| 
|5120|2017-04-18| 
|5000|2017-04-19| 
|1012|2017-04-18| 
|9012|2017-04-19| 

+ ---- + ---------- +

私がやってみました私はid = 3000の両方の日付のためにid = 300の日付だけを記録する必要があります= 2017-04-19

すべての行を返すので、いずれかのdfです。

所望の出力: -

+----+----------+ 
| id|  date| 
+----+----------+ 
|3000|2017-04-19| 
| 
|5120|2017-04-18| 
|5000|2017-04-19| 
|1012|2017-04-18| 
|9012|2017-04-19| 
+----+----------+ 

答えて

0

・ホープ、このことができます!

from pyspark.sql.functions import unix_timestamp, col, to_date, max 

#sample data 
df1=sqlContext.createDataFrame([("3000", "2017-04-19"), 
           ("5000", "2017-04-19"), 
           ("9012", "2017-04-19")], 
           ["id", "date"]) 
df2=sqlContext.createDataFrame([("3000", "2017-04-18"), 
           ("5120", "2017-04-18"), 
           ("1012", "2017-04-18")], 
           ["id", "date"]) 
df=df2.union(df1) 
df.show() 

#convert 'date' column to date type so that latest date can be fetched for an ID 
df = df.\ 
    withColumn('date_inDateFormat',to_date(unix_timestamp(col('date'),"yyyy-MM-dd").cast("timestamp"))).\ 
    drop('date') 

#get latest date for an ID 
df = df.groupBy('id').agg(max('date_inDateFormat').alias('date')) 
df.show() 

出力は次のとおりです。

+----+----------+ 
| id|  date| 
+----+----------+ 
|5000|2017-04-19| 
|1012|2017-04-18| 
|5120|2017-04-18| 
|9012|2017-04-19| 
|3000|2017-04-19| 
+----+----------+ 

注:答えはあなたの問題を解決するのに役立ちます場合let SO knowすることを忘れないでください。

+0

ありがとうございました。それは間違いなく機能し、複製機能も動作します。 – Shrikant

+0

それは助けて嬉しい:) – Prem

関連する問題