エラー:SparkError:XXXXタスクのシリアル化された結果の合計サイズ(2.0 GB)spark.driver.maxResultSizeよりも大きい(2.0 GB)
ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
は目標:モデルを使用して、すべてのユーザーのための勧告を取得し、各ユーザのテストデータと重複し、重複率を生成する。
spark mllibを使用して推奨モデルを作成しました。テストデータのユーザーあたりの重複率とユーザーあたりの推奨アイテムを評価し、平均重複率を生成します。
def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count
var r = 0.0
if (overlaps != 0)
r = overlaps/n
return r
}
しかし、ここで問題となるのは、maxResultSize
エラーが発生することです。私のスパークの構成では、私はmaxResultSize
を増やすために従いました。
val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")
しかし、それは問題を解決しませんでしたが、私はまだ問題が解決し得なかったドライバーのメモリを割り当て量にほぼ近い行ってきました。コードが実行されている間、私は私の火花の仕事を見守っていました。私が見たのはちょっと困惑しています。ステージコード上で
[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)
(行番号正確にわからない)line 277
周り火花mllib recommendForAll
にMatrixFactorizationコードを実行しています。
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
recommendForAll
方法はrecommendProductsForUsers
メソッドから呼び出されます。
しかし、メソッドが1Mのタスクを回転させているように見えます。私は混乱して1Mのタスクを吐き出すようになったので、それが問題かもしれないと思います。
私の質問は、どうすればこの問題を解決できますか。このアプローチを使用しないと、実際にはoverlap ratio
または[email protected]
を計算するのは難しいです。これは、スパーク1.5(Clouderaの5.5)である