2016-07-05 4 views
0

私は、ユーザーID、ゲームID、スコア、およびゲームが行われた時のタイムスタンプを含むユーザーゲームセッションを持っています。次のアイテムからの最初のグループ化アイテムの集約

from pyspark import SparkContext 
from pyspark.sql import HiveContext 
from pyspark.sql import functions as F 

sc = SparkContext("local") 

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame([ 
    ("u1", "g1", 10, 0), 
    ("u1", "g3", 2, 2), 
    ("u1", "g3", 5, 3), 
    ("u1", "g4", 5, 4), 
    ("u2", "g2", 1, 1), 
], ["UserID", "GameID", "Score", "Time"]) 

所望の出力

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 

私は、ユーザーが初めてプレイしたゲームだけでなく、第二のゲームの最大スコアの最大スコアを取得するようにデータを変換したい(ボーナス後続のすべてのゲームの最大得点を得ることができれば)。残念ながら、私はそれがSpark SQLでどうできるかはわかりません。

私はUserID、GameIDでグループ化してから最大スコアと最小時間を取得することができます。そこから進める方法はわからない。

明確化:MaxScoreGame1とMaxScoreGame2は、1番目と2番目のゲームユーザープレイヤーを指します。 GameIDではなく

答えて

1

ウィンドウ機能とピボットの組み合わせを試すことができます。

  1. UserIDで区切られたすべてのゲームの行番号をTimeで指定します。
  2. GameNumberが1または2であることをフィルタリングします。
  3. 希望の出力形状を得るためにピボットします。

残念ながら、私はscalaをPythonではなく使用していますが、以下はPythonライブラリにかなり簡単に移植できるはずです。

import org.apache.spark.sql.expressions.Window 

// Use a window function to get row number 
val rowNumberWindow = Window.partitionBy(col("UserId")).orderBy(col("Time")) 

val output = { 
    df 
    .select(
     col("*"), 
     row_number().over(rowNumberWindow).alias("GameNumber") 
    ) 
    .filter(col("GameNumber") <= lit(2)) 
    .groupBy(col("UserId")) 
    .pivot("GameNumber") 
    .agg(
     sum(col("Score")) 
    ) 
} 

output.show() 

+------+---+----+ 
|UserId| 1| 2| 
+------+---+----+ 
| u1| 10| 2| 
| u2| 1|null| 
+------+---+----+ 
+1

また、あなたは出力に二つ以上のゲームだけでフィルタリングしないとピボットは残りの世話をする見たい場合は追加します。 – Blakey

+0

ウィンドウとrow_numberがトリックを行いました。 PySparkに私のソリューションを掲載するつもりですが、少し違っています。あなたのコードがショーで動作することを確認できますか?私はあなたに答えを与えることができますか? – ksindi

+1

ちょうど出力で更新され、私は実際にピボット上でgroupByの代わりにselectを使用していたことに気付きましたが、これはうまくいきませんでした。あなたのポスト( "u1"、 "g3"、2、2)、( "u1"、または "u1")のように、元のデータフレームにタイプミスがあると仮定して、 "g3"、5,3)、 – Blakey

1

PySparkとソリューション:

from pyspark.sql import Window 

rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time")) 

(df 
.groupBy("UserID", "GameID") 
.agg(F.max("Score").alias("Score"), 
     F.min("Time").alias("Time")) 
.select(F.col("*"), 
     F.row_number().over(rowNumberWindow).alias("GameNumber")) 
.filter(F.col("GameNumber") <= F.lit(2)) 
.withColumn("GameMaxScoreCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber"))) 
.groupBy("UserID") 
.pivot("GameMaxScoreCol") 
.agg(F.max("Score")) 
).show() 

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 
関連する問題