2016-04-15 3 views
4

私は複数の列を持つSpark DataFrameを持っています。私は1つの列に基づいて行をグループ化し、各グループの2番目の列のモードを見つけるしたいと思います。パンダのデータフレームでの作業、私はこのような何かをするだろう:pyspark以内(py)でグループ化されたデータのモードSpark

rand_values = np.random.randint(max_value, 
           size=num_values).reshape((num_values/2, 2)) 
rand_values = pd.DataFrame(rand_values, columns=['x', 'y']) 
rand_values['x'] = rand_values['x'] > max_value/2 
rand_values['x'] = rand_values['x'].astype('int32') 

print(rand_values) 
## x y 
## 0 0 0 
## 1 0 4 
## 2 0 1 
## 3 1 1 
## 4 1 2 

def mode(series): 
    return scipy.stats.mode(series['y'])[0][0] 

rand_values.groupby('x').apply(mode) 
## x 
## 0 4 
## 1 1 
## dtype: int64 

、私はどのようにのために途方に暮れてよやって単一の列のモード

df = sql_context.createDataFrame(rand_values) 

def mode_spark(df, column): 
    # Group by column and count the number of occurrences 
    # of each x value 
    counts = df.groupBy(column).count() 

    # - Find the maximum value in the 'counts' column 
    # - Join with the counts dataframe to select the row 
    # with the maximum count 
    # - Select the first element of this dataframe and 
    # take the value in column 
    mode = counts.join(
     counts.agg(F.max('count').alias('count')), 
     on='count' 
    ).limit(1).select(column) 

    return mode.first()[column] 

mode_spark(df, 'x') 
## 1 
mode_spark(df, 'y') 
## 1 

を見つけることができていますその機能をグループ化されたデータに適用します。このロジックをDataFrameに直接適用することができない場合は、他の手段で同じ効果を得ることは可能ですか?

ありがとうございます!

+1

グループを(x、y)で集計し、カウントで集計し、ここに示すように最大行を選択してください:http://stackoverflow.com/a/35226857/1560062 – zero323

+0

クイック返信ありがとう!私はこれを試してみよう! – bjack3

+2

それは働いたように見えます!私のソリューションを追加することは可能ですか?あなたを「回答者」としてマークしますか? – bjack3

答えて

7

解決策はzero323です。

オリジナル溶液:https://stackoverflow.com/a/35226857/1560062

まず、各(x、y)の組み合わせの回出数えます。

counts = df.groupBy(['x', 'y']).count().alias('counts') 
counts.show() 
## +---+---+-----+ 
## | x| y|count| 
## +---+---+-----+ 
## | 0| 1| 2| 
## | 0| 3| 2| 
## | 0| 4| 2| 
## | 1| 1| 3| 
## | 1| 3| 1| 
## +---+---+-----+ 

解決策1: 'x'でグループ化し、各グループのカウントの最大値を取って集計します。最後に、 'count'列を削除します。

result = (counts 
      .groupBy('x') 
      .agg(F.max(F.struct(F.col('count'), 
           F.col('y'))).alias('max')) 
      .select(F.col('x'), F.col('max.y')) 
     ) 
result.show() 
## +---+---+ 
## | x| y| 
## +---+---+ 
## | 0| 4| 
## | 1| 1| 
## +---+---+ 

解決策2:ウィンドウを使用して 'x'でパーティションを区切り、 'count'列で並べ替えます。ここで、各パーティションの最初の行を選択します。

win = Window().partitionBy('x').orderBy(F.col('count').desc()) 
result = (counts 
      .withColumn('row_num', F.rowNumber().over(win)) 
      .where(F.col('row_num') == 1) 
      .select('x', 'y') 
     ) 
result.show() 
## +---+---+ 
## | x| y| 
## +---+---+ 
## | 0| 1| 
## | 1| 1| 
## +---+---+ 

2つの結果は、行のソート方法が異なるため、結果が異なります。結び目がない場合、2つの方法で同じ結果が得られます。

関連する問題