mapPartitionsの出力をListBufferに格納し、そのイテレータを出力として公開します。出力はLongタプル(Tuple2)のリストです。 SparkのSizeEstimator.estimateメソッドを使用してオブジェクトのサイズをチェックすると、レコード/タプルオブジェクトあたり80バイトになります(これは "ListBufferオブジェクトのサイズ/#レコード"によって計算されます)。これは、長い型のTuple2オブジェクト(2つの8バイト長+オブジェクトオーバーヘッドメモリ)に対しては大きすぎると思います。これがなぜそうであるか、そして出力によって取り込まれたメモリを減らす方法は?私は明らかな何かが欠けていると確信しています。spark mapPartition出力オブジェクトのサイズが予想より大きくなっています
また、これらのListBufferオブジェクトはメモリに余りにも大きくなっており、メモリやディスクのスピルが原因でパフォーマンスが低下します。どのように私は単に出力全体をメモリ内オブジェクトとして保存せずにmapPartitionsの出力を書き込むことができますか? mapPartitionsへの各入力レコードは0以上の出力レコードを生成できるので、 "rdd.map"関数イテレータを使用することはできません。それが私の原因を助けてくれるのかどうかわからない。ここ
は、コードスニペットである:それはリスト内の各項目のリストノードを作成するため
var outputRDD = sortedRDD.mapPartitionsWithIndex((partitionNo,p) => {
var outputList = ListBuffer[(Long,Long)]()
var inputCnt: Long = 0;
var outputCnt: Long = 0;
while (p.hasNext) {
inputCnt = inputCnt + 1;
val tpl = p.next()
var partitionKey = ""
try{
partitionKey = tpl._1.split(keyDelimiter)(0) //Partition key
}catch{
case aob : ArrayIndexOutOfBoundsException => {
println("segmentKey:"+partitionKey);
}
}
val value = tpl._2
var xs: Array[Any] = value.toSeq.toArray;
//value.copyToArray(xs);
val xs_string : Array[String] = new Array[String](value.size);
for(i <- 0 to value.size-1){
xs_string(i) = xs(i) match { case None => ""
case null => ""
case _ => xs(i).toString()
}
}
val outputTuples = windowObject.process(partitionKey, xs_string);
if(outputTuples != null){
for (i <- 0 until outputTuples.size()) {
val outputRecord = outputTuples.get(i)
if (outputRecord != null) {
outputList += ((outputRecord.getProfileID1 , outputRecord.getProfileID2))
outputCnt = outputCnt +1;
}
}
}
}
if(debugFlag.equals("DEBUG")){
logger.info("partitionNo:"+ partitionNo + ", input #: "+ inputCnt +", output #: "+ outputCnt+", outputList object size:" + SizeEstimator.estimate(outputList));
}
outputList.iterator
}, false)