1

私はgroup byとsum関数でpysparkコードを書いています。私はグループごとにパフォーマンスが影響を受けていると感じます。代わりにreducebykeyを使いたい。しかし、私はこの分野では新しいです。以下の私のシナリオ見つけてください、pysparkデータフレームでgroupbyをreducebykeyに変換するには?

ステップ1:ハイブテーブルを読み込むデータフレームにsqlcontextや店舗スルークエリデータを結合

ステップ2:入力列の合計数は、5で15は、キーフィールドであり、残りの数値ですされています。

上記の入力列に加えて、数値列からいくつかの列を派生させる必要があります。デフォルト値を持つ列はほとんどありません。

手順4:group byとsum関数を使用しました。 mapとreducebykeyオプションを使ってspark wayで同様のロジックを実行する方法。

from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show() 
+0

[DataFrame groupBy動作/最適化](https://stackoverflow.com/questions/32902982/dataframe-groupby-behaviour-optimization)の可能な複製 – user8371915

答えて

1

Spark SQLにはreduceByKeyはありません。

groupBy +集約関数は、RDD.reduceByKeyとほぼ同じです。 Sparkは、RDD.groupByKey(collect_listの場合)またはRDD.reduceByKeyの場合に自動的に選択します。

Dataset.groupBy +集計関数のパフォーマンスは、RDD.reduceByKeyよりも優れている必要があります。 Catalystオプティマイザは、バックグラウンドで集計を行う方法に注意を払います

+0

スパークSQLのgroupBy +アグリゲーションでは、ドライバの代わりにエグゼキュータに最終的なアグリゲーションの追加ステップを追加するだけです。 –

+0

ご返信ありがとうございます。 reduceByKeyをデータフレームに適用できないのですか? reduceByKeyと同じことを伝える多くの記事は、最終段階で行の数を減らすことによってグループ化して以来、大きなデータセットに対してgroup byよりも高速です。 – user3150024

+0

@ user3150024これらの記事はRDDに関するものです - データセットには抽象化レイヤがあり、Catalystオプティマイザはクエリを最適化します:) –

関連する問題