私はこのようなテーブルの処理だ最小限に抑える方法:スパークreduceByKeyとシャッフル
ID f1
001 1
001 2
001 3
002 0
002 7
をし、私がいることを、同じIDのF1列の合計を計算し、その合計を使用して新しい列を作成したいです次のとおりです。
ID f1 sum_f1
001 1 6
001 2 6
001 3 6
002 0 7
002 7 7
私のソリューションは、reduceByKey
で合計を計算し、元の表と結果を結合です:
val table = sc.parallelize(Seq(("001",1),("001",2),("001",3),("002",0),("002",7)))
val sum = table.reduceByKey(_ + _)
val result = table.leftOuterJoin(sum).map{ case (a,(b,c)) => (a, b, c.getOrElse(-1))}
と私は正しい結果を得る:
result.collect.foreach(println)
出力:
(002,0,7)
(002,7,7)
(001,1,6)
(001,2,6)
(001,3,6)
問題は、私はコードを書く場合はそこに2つのシャッフルステージは、コード内にあるreduceByKey、leftOuterJoin内の他の1つが、ありますHadoop MapReduceでは、シャッフルステージを1つだけ使用して同じ結果を得るのは簡単です(削減ステージではoutputer.collect
の機能を複数回使用します)。 シャッフルを使って作業をするより良い方法があるのだろうかと思っていました。どんな提案も感謝します。
質問を明示的に表現するために、質問のタイトルを変更する必要があると思います。 –