2015-12-02 6 views
7

エラー:SparkError:XXXXタスクのシリアル化された結果の合計サイズ(2.0 GB)spark.driver.maxResultSizeよりも大きい(2.0 GB)

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB) 

は目標:モデルを使用して、すべてのユーザーのための勧告を取得し、各ユーザのテストデータと重複し、重複率を生成する。

spark mllibを使用して推奨モデルを作成しました。テストデータのユーザーあたりの重複率とユーザーあたりの推奨アイテムを評価し、平均重複率を生成します。

def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = { 

    val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey 
    val n = testData.count 

    val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20) 
     .mapValues(_.map(r => r.product)) 

    val overlaps = testData.join(recommendations).map(x => { 
     val moviesPerUserInRecs = x._2._2.toSet 
     val moviesPerUserInTest = x._2._1.toSet 
     val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest) 
     if(localHitRatio.size > 0) 
     1 
     else 
     0 
    }).filter(x => x != 0).count 

    var r = 0.0 
    if (overlaps != 0) 
     r = overlaps/n 

    return r 

    } 

しかし、ここで問題となるのは、maxResultSizeエラーが発生することです。私のスパークの構成では、私はmaxResultSizeを増やすために従いました。

val conf = new SparkConf() 
conf.set("spark.driver.maxResultSize", "6g") 

しかし、それは問題を解決しませんでしたが、私はまだ問題が解決し得なかったドライバーのメモリを割り当て量にほぼ近い行ってきました。コードが実行されている間、私は私の火花の仕事を見守っていました。私が見たのはちょっと困惑しています。ステージコード上で

[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB) 

(行番号正確にわからない)line 277周り火花mllib recommendForAllにMatrixFactorizationコードを実行しています。

private def recommendForAll(
     rank: Int, 
     srcFeatures: RDD[(Int, Array[Double])], 
     dstFeatures: RDD[(Int, Array[Double])], 
     num: Int): RDD[(Int, Array[(Int, Double)])] = { 
    val srcBlocks = blockify(rank, srcFeatures) 
    val dstBlocks = blockify(rank, dstFeatures) 
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { 
     case ((srcIds, srcFactors), (dstIds, dstFactors)) => 
     val m = srcIds.length 
     val n = dstIds.length 
     val ratings = srcFactors.transpose.multiply(dstFactors) 
     val output = new Array[(Int, (Int, Double))](m * n) 
     var k = 0 
     ratings.foreachActive { (i, j, r) => 
      output(k) = (srcIds(i), (dstIds(j), r)) 
      k += 1 
     } 
     output.toSeq 
    } 
    ratings.topByKey(num)(Ordering.by(_._2)) 
    } 

recommendForAll方法はrecommendProductsForUsersメソッドから呼び出されます。

しかし、メソッドが1Mのタスクを回転させているように見えます。私は混乱して1Mのタスクを吐き出すようになったので、それが問題かもしれないと思います。

私の質問は、どうすればこの問題を解決できますか。このアプローチを使用しないと、実際にはoverlap ratioまたは[email protected]を計算するのは難しいです。これは、スパーク1.5(Clouderaの5.5)である

答えて

0

2GBの問題はスパークコミュニティに新しいものではありません。https://issues.apache.org/jira/browse/SPARK-6235

RE/2GB以上のパーティションサイズも大きく、より多くのパーティションを再作成するためにあなたのRDDを(myRdd.repartition(parallelism))をしてみてください(w/r/t /あなたの現在の並列度レベル)、各単一パーティションのサイズを縮小します。

私は、srcBlocks.cartesian(dstBlocks) API呼び出しから出てくる可能性があると仮定しています。これは、(z = srcBlocksのパーティション数* dstBlocksのパーティション数)パーティション。

この場合、repartitionの代わりにmyRdd.coalesce(parallelism) APIを利用することを検討し、シャッフル(およびシリーアライゼーション関連の問題のパーティション化)を避けることができます。

関連する問題