2017-11-20 22 views
0

私はこの問題を表現するのに少し問題があります。Pyspark - 'previous'レコードを検索して、新しい列をデータフレームに追加します。

私は大量のイベントデータを取得しようとしています。要件の1つは、以前のイベントのデータを含めることです。私はpysparkを使用してこれを行う適切な方法を見つけるのに苦労しています。

試して説明する。私のデータフレームを仮定すると、次のようになります。各レコードの場合

uid| id|   event_time| event_value| 
---|---|--------------------|------------| 
1 | 1| 2017-11-20 12:00:00|   a| 
2 | 1| 2017-11-20 13:00:00|   b| 
3 | 2| 2017-11-20 12:00:00|   c| 
4 | 2| 2017-11-20 13:00:00|   d| 
5 | 2| 2017-11-20 14:00:00|   e| 

が、私は同じIDを持つ最新の前のイベントを見つけて、新しい列としてこれを追加します。すなわち

uid| id|   event_time| event_value| previous_event_value| 
---|---|--------------------|------------|---------------------| 
1 | 1| 2017-11-20 12:00:00|   a|     null| 
2 | 1| 2017-11-20 13:00:00|   b|     a| 
3 | 2| 2017-11-20 12:00:00|   c|     null| 
4 | 2| 2017-11-20 13:00:00|   d|     c| 
5 | 2| 2017-11-20 14:00:00|   e|     d| 

私はいくつかのウィンドウ関数を見てきましたが、私はこれが私の使用例をサポートしているとは確信していません。どんな助けもありがとう。

+1

paritioninながらはい、uidと –

+1

を使用しています。私は今のようなものを持っています... 'df = df.withColumn(" previous_event_value "、lag(df.event_value).over(Window.partitionBy(" id ")。orderBy(" event_time "))))' – philantrovert

+0

感謝の両方を昇順EVENT_TIMEことにより、IDと順序によって 'lag'(ウィンドウ関数) – robarthur1

答えて

0

他の誰かがこれにつまずく。上記の提案は完全に機能しました。ラグウィンドウ機能の使用。例えば

df = df.withColumn("previous_event_value", 
    lag(df.event_value).over(Window.partitionBy("id").orderBy("e‌​vent_time"))) 
関連する問題