2017-04-20 5 views
1

こんにちは私のコードでgroupByKeyをよく使う必要がありますが、それは非常に重い操作であることが分かります。パフォーマンスを向上させるために取り組んでいるので、すべてgroupByKeyコールを削除するというアプローチが効率的かどうかは疑問でした。sparkのgroupByKeyをreduceByKeyに置き換えてください

私は別のRDDからRDDとタイプのペア作成(INT、INT)

rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]

を作成するために使用されたと私はこのような何か得るために必要なことから:

[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

私が使用したのはout = rdd1.groupByKeyでしたが、このアプローチは巨大なデータセットでは非常に問題になる可能性があるので、私はこの解決方法を使用すると考えました:

の代わりに、私はタイプのペアから、それを作成してくださいどのようなタイプのペア(INT、INT)の私のRDD rdd1を作成する(INT、一覧の[int])ので、私のrdd1この

rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]

のようなものでした

しかし今回は同じ結果に達するために、私はreduceByKey(_ ::: _)をすべての値をキーで結合しました。このアプローチを使用するとパフォーマンスが向上すると思いますか?私はこのタイプ(Int、List [Int])が唯一の要素を含むリストであるペアを作成するのではないかと恐れていますか?

他の方法を使用して同じ結果に達するより速い方法があると思いますか?ありがとうございました。

+1

I代わりに 'aggregateByKey'または' combineByKey'を使用して、空のリストをインテリアライザーとして使用し、次にそれぞれコンバイナーとマージャーのlist.addとlist.addAllを使用して同じことを考えていました。これは、最初に単一要素のリストを作成することを避けるでしょう。私は、 'groupByKey'が既にこのような場合にうまくいくように最適化されていると信じていました。 – vefthym

答えて

3

私はあなたのエンド結果は

[(1, [2, 3]), (2 , [3, 4]), (3, [5])] 

なぜ可能にする場合、あなたがreduceByKeyを使うべきだと思いませんか?これがgroupByKeyのために作られたものなので、おそらくそれが最も良いでしょう。

groupByKeyの問題は、通常、同じキーを持つすべての値のリスト(または配列)は必要ありませんが、このリストから取得できるものです。リストが本当に必要ない場合は、シャッフルと同じステップで、おそらくreduceByKeyを使って減らすことができます。 reduceByKey

二つの利点:

  • (不必要なネットワークペイロードを回避するために、同じエグゼキュータにある値を減少させる)シャッフル前に還元を開始することができ、それは、との値のアレイ全体をロードすることはありません同じキーをメモリに保存します。これは、配列が数GBの大きな巨大なデータセットでは重要です。

あなたが提示したように、最初のポイントはそれほど重要ではありません(実際のデータの縮小はないので、連結のみです)。リスト全体が必要なので、2番目のポイントは適用されません。

しかし、本当にリスト全体が必要なのか、これが計算の一歩に過ぎないのか、特に大規模なデータセットで作業している場合はどうなるかを強くお勧めします。

+0

私はGraphXでグラフを扱っていますので、リストにあるものはVertexIdなので、リストが必要です。はい、この回答が考えられました。 – Matt

3

私はすべてのgroupByKey呼び出しを削除する私のアプローチが効率的であるかどうか私は思っていました。

RDD.toDebugStringをチェックして、RDD変換の論理計画を確認してください。それは、あなたの行動がどれだけ速くなるかどうかについてのかなり良い概観を与えるはずです。

ShuffledRDDはシャッフル操作が発生するため、通常非常に高価です。

reduceByKeyを使用するあなたの考えについては、keyByと考えてください。

rdd.keyBy(_.kind).reduceByKey(....) 

また、(groupByや親戚の後ろに座っている)最も一般的な変換としてaggregateByKeyを考慮することができます。

最後に、groupByには、パーティション数またはPartitionerを定義できる2つのバリアントがあります。これらは、高価なシャッフルを避けることができます。

org.apache.spark.rdd.PairRDDFunctionsで読んでください。

ウェブUIを使用して、「クエリ」のパフォーマンスをより正確に把握できます。あなたのデータを知ることは多くの助けになります。クエリを最適化する時間が無駄になる可能性があるため、十分な時間を費やしてください。

+1

'keyBy'は、非ペアRDDからの(K、V)タプルを作成しました。 OPのRDDはキー値RDDなので、keyByを使う必要はありません:)第二に、groupByKeyは値を減らすこと以外にも非常に非効率的なことがあります。 OPの質問では、値を減らすことが分かるので、groupByKeyではなくreduceByKeyを使うべきです。 –

1

これに少し遅れているかもしれません。それは他人を助けるかもしれない。

val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)) 
val context = getContext() // get Spark Context. 
val tuplesRDD = context.parallelize(tuples) 

val list = mutable.MutableList.empty[Int] 
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s += v 
val mergeLists = (x: mutable.MutableList[Int], 
        y: mutable.MutableList[Int]) => x ++= y 

val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists) 
groupByKey.cache() 
groupByKey.foreach(x => println(x)) 

出力

(1、MutableList(2,3))
(2、MutableList(3、4))
(3、MutableList(5))

関連する問題