2016-04-06 7 views
2

私はこのようなテーブルの処理だ最小限に抑える方法:スパークreduceByKeyとシャッフル

ID f1 
001 1 
001 2 
001 3 
002 0 
002 7 

をし、私がいることを、同じIDのF1列の合計を計算し、その合計を使用して新しい列を作成したいです次のとおりです。

ID f1 sum_f1 
001 1 6 
001 2 6 
001 3 6 
002 0 7 
002 7 7 

私のソリューションは、reduceByKeyで合計を計算し、元の表と結果を結合です:

val table = sc.parallelize(Seq(("001",1),("001",2),("001",3),("002",0),("002",7))) 
val sum = table.reduceByKey(_ + _) 
val result = table.leftOuterJoin(sum).map{ case (a,(b,c)) => (a, b, c.getOrElse(-1))} 

と私は正しい結果を得る:

result.collect.foreach(println) 

出力:

(002,0,7) 
(002,7,7) 
(001,1,6) 
(001,2,6) 
(001,3,6) 

問題は、私はコードを書く場合はそこに2つのシャッフルステージは、コード内にあるreduceByKey、leftOuterJoin内の他の1つが、ありますHadoop MapReduceでは、シャッフルステージを1つだけ使用して同じ結果を得るのは簡単です(削減ステージではoutputer.collectの機能を複数回使用します)。 シャッフルを使って作業をするより良い方法があるのだろうかと思っていました。どんな提案も感謝します。

+1

質問を明示的に表現するために、質問のタイトルを変更する必要があると思います。 –

答えて

1

もう1つの方法は、aggregateByKeyを使用することです。これは、メソッド を理解することがdiffcultが、火花のドキュメントからのものであってもよい:

groupByKey)注意:この操作は非常に高価である可能性があります。 の順にグループ化して、各キーで集計(合計や平均など)を実行する場合は、をPairRDDFunctions.aggregateByKeyまたはPairRDDFunctions.reduceByKey とすると、パフォーマンスが大幅に向上します。

また、aggregateByKeyも知っておく価値があります。

もちろん、ここでは合計のような単純な集計は行っていませんので、 のこのアプローチのパフォーマンス上の利点対groupByKeyは存在しない可能性があります。明らかに両方のアプローチを実際のデータにベンチマークすることは良い考えです。ここで

は、詳細な実装です:

// The input as given by OP here: http://stackoverflow.com/questions/36455419/spark-reducebykey-and-keep-other-columns 
val table = sc.parallelize(Seq(("001", 1), ("001", 2), ("001", 3), ("002", 0), ("002", 7))) 

// zero is initial value into which we will aggregate things. 
// The second element is the sum. 
// The first element is the list of values which contributed to this sum. 
val zero = (List.empty[Int], 0) 

// sequencer will receive an accumulator and the value. 
// The accumulator will be reset for each key to 'zero'. 
// In this sequencer we add value to the sum and append to the list because 
// we want to keep both. 
// This can be thought of as "map" stage in classic map/reduce. 
def sequencer(acc: (List[Int], Int), value: Int) = { 
    val (values, sum) = acc 
    (value :: values, sum + value) 
} 

// combiner combines two lists and sums into one. 
// The reason for this is the sequencer may run in different partitions 
// and thus produce partial results. This step combines those partials into 
// one final result. 
// This step can be thought of as "reduce" stage in classic map/reduce. 
def combiner(left: (List[Int], Int), right: (List[Int], Int)) = { 
    (left._1 ++ right._1, left._2 + right._2) 
} 

// wiring it all together. 
// Note the type of result it produces: 
// Each key will have a list of values which contributed to the sum, sum the sum itself. 
val result: RDD[(String, (List[Int], Int))] = table.aggregateByKey(zero)(sequencer, combiner) 

// To turn this to a flat list and print, use flatMap to produce: 
// (key, value, sum) 
val flatResult: RDD[(String, Int, Int)] = result.flatMap(result => { 
    val (key, (values, sum)) = result 
    for (value <- values) yield (key, value, sum) 
}) 

// collect and print 
flatResult.collect().foreach(println) 

これが生成します。あなたはそれを参照したい場合はここで

(001,1,6) 
(001,2,6) 
(001,3,6) 
(002,0,7) 
(002,7,7) 

も、上記 の完全実行可能なバージョンとの主旨である:https://gist.github.com/ppanyukov/253d251a16fbb660f225fb425d32206a

+0

ありがとうございました!スパークを学ぶために長い道のり:) –

0

あなたは、値のリストを取得するの和を取り、flatMapValuesでラインを再作成するgroupByKeyを使用することができます。

val g = table.groupByKey().flatMapValues { f1s => 
    val sum = f1s.reduce(_ + _) 
    f1s.map(_ -> sum) 
} 

しかしreduceこのコードでは、ローカルに動作しますので、単一のキーがあまりにも持っている場合、これは失敗します。多くの価値。

別のアプローチは、joinを維持することですが、パーティションは最初、そうは、参加安いです:

val partitioned = table.partitionBy(
    new org.apache.spark.HashPartitioner(table.partitions.size)) 
partitioned.cache // May or may not improve performance. 
val sum = partitioned.reduceByKey(_ + _) 
val result = partitioned.join(sum) 

私がより速くなると思われる推測することはできません。私はすべてのオプションをベンチマークしたいと思います。

+0

ありがとう!私が自由な時にそれらのベンチマークを行い、コメントにあなたを教えます。 –

関連する問題