2016-05-24 1 views
0

groupByKeyを別のものに置き換える最も良い方法を知りたいと思います。apacheでgroupByKeyを置き換える方法Spark

基本的に私はRDD [(int型、リスト[対策])を取得したいと思い、私の状況:

// consider measures like RDD of objects 
measures.keyBy(_.getId) 
     .groupByKey 

私の考えでは、それはあまりシャッフル原因bacause、代わりにreduceByKeyを使用することです:

measures.keyBy(_.getId) 
     .mapValues(List(_)) 
     .reduceByKey(_++_) 

しかし、私は非常に非効率的であるため、たくさんの不要なリストオブジェクトをインスタンス化する必要があります。

他に誰かがgroupByKeyを置き換えることができますか?

+0

実際に使用する場合に非効率的であるかどうか試してみましたか?これは、自分の考えていることがあなたのデータに当てはまるかどうかを確認する唯一の方法です。 – sgvd

+0

それ以外に、groupByKeyを置き換える他の方法はありますか? – Giorgio

答えて

2

別の方法は、元の値とは異なるタイプに値を結合するための特異的であるaggregateByKeyを、使用している:

measures.keyBy(_.getId) 
     .aggregateByKey(List[Measure]())(_ :+ _, _ ++ _) 

これは、各パーティション内の各キーの空のリストを作成し、これらにすべての値を追加します各パーティションで最後にリストをシャッフルして各キーのすべてを連結します。 Scalaでリストへの追加

はO(N)であり、O(1)であり、先頭に追加することをお勧めしますが、少し少ないクリーンになります

measures.keyBy(_.getId) 
     .aggregateByKey(List[Measure]())(_.+:(_), _ ++ _) 

または:

measures.keyBy(_.getId) 
     .aggregateByKey(List[Measure]())((l, v) => v +: l, _ ++ _) 

これはおそらくreduceByKeyの例よりも効率的ですが、reduceByKeyaggregateByKeyがはるかに優れている状況がgroupByKeyよりも大きい場合は、最初にデータサイズを大幅に縮小し、ずっと小さな結果をシャッフルすることができます。この場合、中間リストには最初からすべてのデータが含まれているので、パーティションごとのリストが結合されたときにはフルデータセットでシャフリングされます(これはreduceByKeyの場合も同様です)。

  • それがマップ側集約を無効:zero323が指摘したようにそれはすべてのデータのリストを構築して知っており、そのために特別に最適化を行うことができるので

    さらに、groupByKeyは、この場合、実際に、より効率的ですすべてのデータを含む大きなハッシュマップを構築することを防ぎます。

  • スマートバッファ(CompactBuffer)を使用します。これにより、不変リストを1つずつ構築するのに比べてメモリ割り当て量が大幅に削減されます。キーの数は、値の数よりもはるかに小さくない場合groupByKey及びreduceByKey又はaggregateByKeyとの差が最小であってもよい

別の状況です。

+0

実際にはマップ側の削減のために効率が大幅に低下します。'groupByKey'は、GCコストを削減するこのアプローチを特に避けています。たとえば、http://stackoverflow.com/a/33222398/1560062を参照してください。不変バッファによって、さらに悪化します。 – zero323

+0

ありがとう! 'groupByKey'が何をしているのかを少し深く読んで、私の答えを真実に近いところで編集しました。 – sgvd

+0

LGTM、ありがとう:) – zero323

関連する問題