2017-01-21 1 views
0

環境:spark 1.60。私はスカラを使用します。 私はsbtでプログラムをコンパイルできますが、プログラムをコミットするとエラーが発生します。以下のように 私の完全なエラーは次のとおりです。タスクはaggegateByKeyについてシリアル化できません

238 17/01/21 18:32:24 INFO net.NetworkTopology: Adding a new node: /YH11070029/10.39.0.213:50010 
17/01/21 18:32:24 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.39.0.44:41961 with 2.7 GB RAM, BlockManagerId(349, 10.39.0.44, 41961) 
17/01/21 18:32:24 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.39.2.178:48591 with 2.7 GB RAM, BlockManagerId(518, 10.39.2.178, 48591) 
Exception in thread "main" org.apache.spark.SparkException: Task not  serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:93) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:82) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:82) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:177) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:166) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:166) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$3.apply(PairRDDFunctions.scala:206) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$3.apply(PairRDDFunctions.scala:206) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:205) 
    at com.sina.adalgo.feature.ETL$$anonfun$13.apply(ETL.scala:190) 
    at com.sina.adalgo.feature.ETL$$anonfun$13.apply(ETL.scala:102) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 

コードの目的は、カテゴリ機能frequentenciesを統計することです。メインコードは以下のとおりです。

object ETL extends Serializable { 
      ... ... 


val cateList = featureData.map{v => 
    case (psid: String, label: String, cate_features: ParArray[String], media_features: String) => 
     val pair_feature = cate_features.zipWithIndex.map(x => (x._2, x._1)) 
     pair_feature 
}.flatMap(_.toList) 

def seqop(m: HashMap[String, Int] , s: String) : HashMap[String, Int]={ 
    var x = m.getOrElse(s, 0) 
    x += 1 
    m += s -> x 
    m 
} 

def combop(m: HashMap[String, Int], n: HashMap[String, Int]) : HashMap[String, Int]={ 
    for (k <- n) { 
     var x = m.getOrElse(k._1, 0) 
     x += k._2 
     m += k._1 -> x 
    } 
    m 
} 

val hash = HashMap[String, Int]() 
val feaFreq = cateList.aggregateByKey(hash)(seqop, combop)// (i, HashMap[String, Int]) i corresponded with categorical feature 

オブジェクトはSerializableを継承しています。 なぜですか?私を助けることができますか?

+5

コードを追加できますか?例外だけで問題の原因を見ることはできません。 – maasg

+2

"タスクはシリアル化できません"。独自のコードにシリアル化可能でないオブジェクトがあるかどうかを検出します。 –

+0

コードが表示されます。私はオブジェクトがシリアライズ可能であることを確認しました。 –

答えて

0

この問題は、私たちがいくつかの不要なオブジェクトやスパークドライバコードのメインクラスの内部にある関数を意図的に閉じてしまう集約関数としてクロージャを使用すると、通常Sparkで発生します。

スタックトレースにはトップレベルの犯人としてorg.apache.spark.util.ClosureCleanerが含まれているため、この場合となります。

このような場合、Sparkが実際の集計を行うことができるようにSparkがその機能をワーカーに転送しようとすると、実際に意図したよりもはるかに多くの機能がシリアル化されます。

も参照してください。this post by Erik Erlandsonクロージャのシリアライゼーションのいくつかの境界ケースがよく説明されており、Spark 1.6 notes on closuresもあります。

おそらく、aggregateByKeyで使用する関数の定義を、コードの残りの部分とは完全に独立した別のオブジェクトに移動するのがおそらく簡単です。

+0

tks。私はあなたが別のオブジェクトにaggregateByKeyで使用する関数の定義を移動しようとすると、それは動作しませんでした。私のコードが掲載されており、ハッシュについて何か間違っているとは思わない(HashMap [String、Int]) –

+0

フィードバックとコードをありがとう。 'seqop'と' combop'を別のオブジェクトに移動してみてください。そのオブジェクトには何も他にはなく、 'ETL'の外に移動してください。私はここで起こっているのは、 'ETL'オブジェクト全体が直列化に引き込まれ、' cateList'と 'feaFreq'がRDDであるため、直列化が失敗するということです。 – Svend

関連する問題