2017-10-05 9 views
1

既存の列セットのgroupby集約を使用して、Pysparkにリストの新しい列を作成しようとしています。例えば、入力されたデータフレームが以下に提供される:collect_list別の変数に基づいて注文を保存する

------------------------ 
id | date  | value 
------------------------ 
1 |2014-01-03 | 10 
1 |2014-01-04 | 5 
1 |2014-01-05 | 15 
1 |2014-01-06 | 20 
2 |2014-02-10 | 100 
2 |2014-03-11 | 500 
2 |2014-04-15 | 1500 

予想される出力は、次のとおり

id | value_list 
------------------------ 
1 | [10, 5, 15, 20] 
2 | [100, 500, 1500] 

リスト内の値は、日付でソートされています。

次のように私はcollect_listを使用してみました:

from pyspark.sql import functions as F 
ordered_df = input_df.orderBy(['id','date'],ascending = True) 
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value")) 

をしかしcollect_listは、私は、集計前の日付で入力されたデータフレームを並べ替える場合でも、順序を保証するものではありません。

2番目の(日付)変数に基づいて注文を保存することで、誰かが集計を行う方法を教えてもらえますか?

答えて

6

日付と値の両方をリストとして収集する場合は、日付に応じて結果の列をudfを使用してソートし、その結果の値のみを保持することができます。

import operator 
import pyspark.sql.functions as F 

# create list column 
grouped_df = input_df.groupby("id") \ 
       .agg(F.collect_list(F.struct("date", "value")) \ 
       .alias("list_col")) 

# define udf 
def sorter(l): 
    res = sorted(l, key=operator.itemgetter(0)) 
    return [item[1] for item in res] 

sort_udf = F.udf(sorter) 

# test 
grouped_df.select("id", sort_udf("list_col") \ 
    .alias("sorted_list")) \ 
    .show(truncate = False) 
+---+----------------+ 
|id |sorted_list  | 
+---+----------------+ 
|1 |[10, 5, 15, 20] | 
|2 |[100, 500, 1500]| 
+---+----------------+ 
+0

おかげ詳細な例については...私はほんの数百万人の大きなデータでそれを試してみましたが、私はcollect_listのそれと全く同一の配列を取得しています...なぜこれは可能性が説明する方法はあります起こっている?また、collect_listだけが日付内の複数の値を持つケースを混乱させるように見えることを確認しました...それはcollect_listもまた注文を維持することを意味しますか? – Ravi

+1

あなたのコードでは、collect_list()の前にデータセット全体をソートします。しかし、これは必ずしも必要ではありません。リスト内の日付と値の両方を収集した後に結果として得られるタプルのリストをソートする方が効率的です。 – mtoto

+0

列をソートし、ソートされた列でcollect_listを使用すると順序が保持されますか? – Ravi

関連する問題