私は自分のスパークデータにローリング時間ウィンドウで定義された "ユーザーセッション"を統合しようとしています。2つの整数の差は、pySparkでNoneを返しますか?
私はそのために、この問題を使用している:How to aggregate over rolling time window with groups in Spark
私との違いは、私は私の時間ウィンドウは約5時間になりたいので、その意志は日数を返しDateDiff関数、私は使用することはできませんです。ここで
は私のデータセットである:
[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]
私は必要なのは、ユーザが、そのインデックスセッションを意志列を追加することです。 (たとえば、auction_id 9999はセッション0で、auction_id 8888とauction_id 5555はセッション1です(9999〜8888の間に多くの日数があり、8888〜5555の間に数日しかないためです)。 。ユーザーここ
が私のコードです:。 終わり# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))
# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")
# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned.
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")
# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")
# We get everything
df = df.select("*", subgroup)
、SESSION_INDEXは皆のため0で問題がラインdiff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
から来てここでは、それが点灯している(0)たびに返されます。 (私は0値を変更してチェックしたので)私はいくつかの行を変更してスクリプトを単純化しようとしています:
test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")
私はcoalesce関数とabs関数を削除しました。 session_indexは、各行に対して「なし」です。
私がテストをtest = "timestamp"
に置き換えた場合、これはうまくいくでしょう:私はタイムスタンプの累積合計を取得します。
test = lag("timestamp", 1).over(w)
で置き換えても問題ありません。前の行がないため、ユーザーの最初の行にはNoneが表示され、次に累積された合計が表示されます。
2つの整数を減算しようとすると問題が発生します。しかし、なぜ私は理解していないのですか?それは2つの整数です、結果は整数でなければなりません、そうではありませんか?
ご協力いただきありがとうございます。