0

私はSpark 2.0.0とデータフレームを使用しています。私が欲しいものPysparkのGroupByオブジェクトからOrderedリストへの変換

| id | year  | qty | 
|----|-------------|--------| 
| a | 2012  | 10  | 
| b | 2012  | 12  | 
| c | 2013  | 5  | 
| b | 2014  | 7  | 
| c | 2012  | 3  | 

| id | year_2012 | year_2013 | year_2014 | 
|----|-----------|-----------|-----------| 
| a | 10  | 0   | 0   | 
| b | 12  | 0   | 7   | 
| c | 3   | 5   | 0   | 

または

| id | yearly_qty | 
|----|---------------| 
| a | [10, 0, 0] | 
| b | [12, 0, 7] | 
| c | [3, 5, 0]  | 

私が見つけた最も近いソリューションがcollect_list()であるが、この関数はの注文を提供していないよう は、ここに私の入力データフレームでありますリスト。ループを使用して、すべてのIDをフィルタリングすることなく、これを生成する方法はあり

data.groupBy('id').agg(collect_function) 

:私の心の中で解決策は次のようにすべきですか?

答えて

3

最初のものは容易pivot用いて達成することができる:配列形式にさらに変換することができる

from itertools import chain 

years = sorted(chain(*df.select("year").distinct().collect())) 
df.groupBy("id").pivot("year", years).sum("qty") 

from pyspark.sql.functions import array, col 

(... 
    .na.fill(0) 
    .select("id", array(*[col(str(x)) for x in years]).alias("yearly_qty"))) 

直接第1の取得は、おそらく以降のすべての大騒ぎ価値がありません最初に空白を埋める必要があります。それでもあなたは試みることができる:

from pyspark.sql.functions import collect_list, struct, sort_array, broadcast 
years_df = sc.parallelize([(x,) for x in years], 1).toDF(["year"]) 

(broadcast(years_df) 
    .join(df.select("id").distinct()) 
    .join(df, ["year", "id"], "leftouter") 
    .na.fill(0) 
    .groupBy("id") 
    .agg(sort_array(collect_list(struct("year", "qty"))).qty.alias("qty"))) 

またstruct収集するためのサポートを得るために2.0+スパークが必要です。

どちらの方法も非常に高価ですので、これらの方法を使用する際は注意が必要です。大雑把に言えば、長いものは長いものよりも優れています。

+0

ありがとう、ピボットは私が探しているものです! – CodeMySky

+0

pysparkで 'struct'を収集していますか? – eliasah

+0

Sparkの@eliash 'collect_ *'は、Spark <2.0ではアトミックのみをサポートしていません。 – zero323

関連する問題