2017-01-13 7 views
1

注文したRDDのタイプが((id, ts), some value)です。これは、idフィールドのカスタムパーティショナーを使用してパーティション化されました。パーティション化されたデータでgroupByKey/reduceBuKeyを実行していますが、キーが異なる

math.abs(id.hashCode % numPartitions) 

は今、私はそれがシャッフルし、データセットの再分割を伴うだろう、このパーティションRDDに、次の2つの機能を実行する場合。

val partitionedRDD: ((id:Long, ts:Long), val:String) = <Some Function> 
val flatRDD = orderedRDD.map(_ => (_._1.id, (_._1.ts, _._2))) 

私は、flatRDD.groupByKey()flatRDD.reduceByKey()かどうかをもう一度データセットをシャッフルし、新しいパーティションを作成しますpartitionedRDDやスパークと同じパーティションを持つことになりますされて知りたいですか?

おかげで、 デビ

答えて

1

はい、flatRDDgroupByKeyまたはreduceByKeyを行うことは、必ずしも別のシャッフルが発生します。

あなたが知っているので、あなたのflatRDDがすでにidで仕切られ、あなたが安全に同じidを持つすべてのレコードが1つのパーティション内に存在すると仮定することができます。あなたがgroupBy(id)する場合はそのため、あなたは、このようにあなたのデータをシャッフルからスパークを防止し、別途(preservesPartitioning = true付き)mapPartitionsを使用し、各パーティション上で操作を実行することができます。

flatRDD.mapPartitions({ it => 
    it.toList 
    .groupBy(_._1).mapValues(_.size) // some grouping + reducing the result 
    .iterator 

}, preservesPartitioning = true) 

これは、余分なシャッフルが発生することはありません。

enter image description here

関連する問題