私自身のパーティショナーを実装し、元のrddをシャッフルしようとしましたが、問題が発生しました。私は、これはorg.apache.spark.SparkException:タスクがシリアライズ可能でない場合、
を追加した後に、シリアライズされない機能を参照することによって引き起こされているが、知っておきreleventクラスへ
Serializableを拡張し、この問題がまだ存在します。私は何をすべきか? org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166)でタスク直列化可能ではない : org.apache.sparkのスレッド "メイン" org.apache.spark.SparkExceptionで
例外org.apache.spark.SparkContext.cleanで (SparkContext.scala:1622):.util.ClosureCleaner $ .clean(158 ClosureCleaner.scala)
object STRPartitioner extends Serializable{
def apply(expectedParNum: Int,
sampleRate: Double,
originRdd: RDD[Vertex]): Unit= {
val bound = computeBound(originRdd)
val rdd = originRdd.mapPartitions(
iter => iter.map(row => {
val cp = row
(cp.coordinate, cp.copy())
}
)
)
val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd)
val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd, partitioner)
shuffled.setSerializer(new KryoSerializer(new SparkConf(false)))
val result = shuffled.collect()
}
class STRPartitioner(expectedParNum: Int,
sampleRate: Double,
bound: MBR,
rdd: RDD[_ <: Product2[Coordinate, Vertex]])
extends Partitioner with Serializable {
...
}