2016-10-26 3 views
3

私はユーザ名のデータフレームをどのスレッドに投稿したのか、それらの投稿のタイムスタンプを持っています。スレッドの最初のユーザーは誰だったのか、何時だったのかを把握するために何をしようとしているのですか?最初の投稿がスレッドでグループを行い、次にタイムスタンプで分を行うことであることがわかります。しかしそれはユーザー名を削除します。どのようにグループを使用してユーザー名を保持するのですか?グループを使用しているときにSparkで未使用の列を保持していますか?

+2

ユーザー名別 – eliasah

答えて

1

これは、HiveContextとHive named_struct関数を使用して1つのgroupByで実行できます。トリックはminです。左から順に列を評価し、現在の列が等しい場合は次の行に移動するだけで、構造体でminが機能します。したがって、この場合、タイムスタンプの列を比較するだけですが、min関数が結果を吐き出した後にアクセスする名前を含む構造体を作成します。

data = [ 
    ('user', 'thread', 'ts'), 
    ('ryan', 1, 1234), 
    ('bob', 1, 2345), 
    ('bob', 2, 1234), 
    ('john', 2, 2223) 
] 

header = data[0] 
rdd = sc.parallelize(data[1:]) 
df = sqlContext.createDataFrame(rdd, header) 
df.registerTempTable('table') 

sql = """ 
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest 
FROM table 
GROUP BY thread 
""" 

grouped = sqlContext.sql(sql) 
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp') 
1

これは、row_number()ウィンドウ関数を使用して行うことができます。これは、他のすべての列をそのまま維持します。 withColumnを使用して、 "thread_user_order"のような新しい列を作成し、その値はrow_number()PARTITION BYスレッドORDER BY tsである必要があります。 次に、"thread_user_order" == 1をフィルタリングします。あなたが順序でフィールドをソートして、一度に2つの列を維持する構造体のソート順を利用することができます

df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1)) 
1

:ここ

は、いくつかの擬似コードです。その後、 minと呼ぶと、最初にタイムスタンプでソートされ、次に2回ネストされたときにユーザー名がソートされます。

user_time = functions.struct(df.timestamp, df.username).alias('user_time') 
min_thread_users_df = df.select(df.thread, user_time).groupby('thread').agg(
    functions.min('user_time').alias('user_time')).select(
    'thread', 'user_time.username', 'user_time.timestamp') 
関連する問題