2017-01-25 12 views
2

私は自分のスパークデータにローリング時間ウィンドウで定義された "ユーザーセッション"を統合しようとしています。2つの整数の差は、pySparkでNoneを返しますか?

私はそのために、この問題を使用している

How to aggregate over rolling time window with groups in Spark

私との違いは、私は私の時間ウィンドウは約5時間になりたいので、その意志は日数を返しDateDiff関数、私は使用することはできませんです。ここで

は私のデータセットである:

[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123), 
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123), 
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123), 
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456), 
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456), 
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456), 
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789), 
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789), 
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789), 
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)] 

私は必要なのは、ユーザが、そのインデックスセッションを意志列を追加することです。 (たとえば、auction_id 9999はセッション0で、auction_id 8888とauction_id 5555はセッション1です(9999〜8888の間に多くの日数があり、8888〜5555の間に数日しかないためです)。 。ユーザーここ

が私のコードです:。

終わり
# Add a timestamp (integer) column 
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer')) 

# We partition by user and order by timestamp 
w = Window.partitionBy("user_id_64").orderBy("timestamp") 

# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned. 
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0)) 

# If difference higher than 5 hours 
indicator = (diff > 5 * 60 * 60).cast("integer") 

# We increment for each indicator = 1 
subgroup = sum(test).over(w).alias("session_index") 

# We get everything 
df = df.select("*", subgroup) 

、SESSION_INDEXは皆のため0で問題がラインdiff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))から来てここでは、それが点灯している(0)たびに返されます。 (私は0値を変更してチェックしたので)私はいくつかの行を変更してスクリプトを単純化しようとしています:

test = "timestamp" - lag("timestamp", 1).over(w) 
subgroup = sum(test).over(w).alias("session_index") 

私はcoalesce関数とabs関数を削除しました。 session_indexは、各行に対して「なし」です。

私がテストをtest = "timestamp"に置き換えた場合、これはうまくいくでしょう:私はタイムスタンプの累積合計を取得します。

test = lag("timestamp", 1).over(w)で置き換えても問題ありません。前の行がないため、ユーザーの最初の行にはNoneが表示され、次に累積された合計が表示されます。

2つの整数を減算しようとすると問題が発生します。しかし、なぜ私は理解していないのですか?それは2つの整数です、結果は整数でなければなりません、そうではありませんか?

ご協力いただきありがとうございます。

答えて

0

2つの整数の間に違いがあったのであれば変わってしまいます。さんは再び犯人を見てみましょう:引き算のための

coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0)) 

左側のオペランドがstrです。 strColumnで動作できる__sub__を持っていないので、右オペランドの__rsub__を使用します。 Columnの一般的なdunderメソッドでは、標準のPython型をリテラルとして解釈します。だから、あなたのコードは実際には文字列 "timestamp"から整数を減算しようとし、結果は未定義です。

TL; DRあなたは左側のオペランドとしてColumnを使用する必要があります。

from pyspark.sql.functions import col 

coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))