2017-10-25 23 views
2

私はStackOverflowで見つけたこの問題に対するすべての解決策を試しましたが、これにもかかわらず解決できません。 私は、 "Recommendation"オブジェクトをインスタンス化する "MainObj"オブジェクトを持っています。私が "recommendationProducts"メソッドを呼び出すと、私はいつもエラーが発生します。ここで は、メソッドのコードです:スパークタスクがシリアライズ不可能

def recommendationProducts(item: Int): Unit = { 

val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0)) 

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = { 
    vec1.dot(vec2)/(vec1.norm2() * vec2.norm2()) 
} 

val itemFactor = model.productFeatures.lookup(item).head 
val itemVector = new DoubleMatrix(itemFactor) 

//Here is where I get the error: 
val sims = model.productFeatures.map { case (id, factor) => 
    val factorVector = new DoubleMatrix(factor) 
    val sim = cosineSimilarity(factorVector, itemVector) 
    (id, sim) 
} 

val sortedSims = sims.top(10)(Ordering.by[(Int, Double), Double] { 
    case (id, similarity) => similarity 
}) 

println("\nTop 10 products:") 
sortedSims.map(x => (x._1, x._2)).foreach(println) 

これはエラーです:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.map(RDD.scala:369) 
at RecommendationObj.recommendationProducts(RecommendationObj.scala:269) 
at MainObj$.analisiIUNGO(MainObj.scala:257) 
at MainObj$.menu(MainObj.scala:54) 
at MainObj$.main(MainObj.scala:37) 
at MainObj.main(MainObj.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
Serialization stack: 
- object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 
- field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext) 
- object (class MainObj$$anon$1, [email protected]) 
- field (class: RecommendationObj$$anonfun$37, name: $outer, type: class RecommendationObj) 
- object (class RecommendationObj$$anonfun$37, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
... 14 more 

は、私が追加しようとしました: 1)」)私のクラス 2(スカラ座) "Serializableを拡張します"このクラス内でモデル(および他の機能)を取得します(今は別のオブジェクトからそれらを取得し、私はそれらを私に渡します)。クラスのような引数)

どうすれば解決できますか?私は狂っているよ! ありがとうございます!

答えて

3

鍵はここにある:

field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext) 

ですから、タイプSparkContextのフィールドという名前のSCを持っています。 Sparkはクラスをシリアライズしたいので、すべてのフィールドをシリアライズしようとします。

あなたがする必要があります

  • 使用@Transient注釈とヌルかどうかをチェックし、フィールドからSparkContextを使用しますが、メソッドの引数に入れていない
  • を再作成します。ただし、マップ、フラットマップなどのクロージャ内ではSparkContextを使用しないでください。
+0

ありがとうございました!できます!しかし、私はクラスの引数のようなSparkContext(sc)を渡し、私はコンストラクタでモデルをロードするためにそれを使用します。違いますか? –

+0

@ S.SPそれがあなたを助けたならば、投票して答えを受け入れてください。間違っているわけではありませんが、シリアライザがシリアル化しないように '@transient'アノテーションを使用する必要があります –

+0

Ok!ありがとうございました。私はあなたに投票したいですが、私はまだそれを行うには15の評判のポイントがありません。あなたの答えが私をとても助けてくれたので、すみません! –

関連する問題