2017-11-13 18 views
0

私は2つのPySpark DataFramesを持っています。次のように私は両方のデータフレームに参加:df1がそれを持っていませんがDataFramesに参加し、タイムスタンプで最新の行を取得するにはどうすればよいですか?

df = df1.join(df2,['col1', 'col2'], 'inner') 

DATAFRAME df2は、列timestampがあります

df1 = 
col1 col2 
AA  11 
BB  22 

df2 = 
timestamp col1 col2 col3 
1510586134 AA  11 3 
1510586140 AA  11 2 
1510586200 AA  11 5 
1510586134 BB  22 3 

にはどうすればtimestampに応じdf2の最新の行によってデータフレームに参加することができます?

結果は以下のようになります。

col1 col2 col3 
AA  11  5 
BB  22  3 
+0

使用ウィンドウ関数。 window.partitionBy( "col1"、 "col2")。orderBy( "timestamp") 'そして、結果のデータフレームを' col1、col2'をキーとして結合します。 – philantrovert

答えて

1

は、この情報がお役に立てば幸い!

from pyspark.sql.functions import col, rank 
from pyspark.sql.window import Window 

#sample data 
df1 = sc.parallelize([ 
    ['AA', 11], 
    ['BB', 22] 
]).toDF(('col1', 'col2')) 
df2 = sc.parallelize([ 
    [1510586134, 'AA', 11, 3], 
    [1510586140, 'AA', 11, 2], 
    [1510586200, 'AA', 11, 5], 
    [1510586134, 'BB', 22, 3] 
]).toDF(('timestamp', 'col1', 'col2', 'col3')) 

#select latest row of df2 according to timestamp 
df2_temp = df2.withColumn('timestamp_format_col', col('timestamp').cast("timestamp")) 
window = Window.partitionBy('col1','col2').\ 
    orderBy(col('timestamp_format_col').desc()) 
df2_temp = df2_temp.\ 
    select('*', rank().over(window).alias('rank')).\ 
    filter(col('rank')==1).\ 
    drop('rank','timestamp','timestamp_format_col') 

#final result 
df = df1.join(df2_temp, ['col1', 'col2'], 'inner') 
df.show() 

出力は次のようになります。最新の行と下駄のデータフレームに

+----+----+----+ 
|col1|col2|col3| 
+----+----+----+ 
| BB| 22| 3| 
| AA| 11| 5| 
+----+----+----+ 
+0

@Dinosauriusは、あなたはあなたの問題を解決する:) – Prem

関連する問題