2017-06-25 1 views
1

私はこのような1つのデータフレームがあります。スパークスカラ:タスクserializbleない

val temp = sc.parallelize(Seq(Array(43,53,266),Array(69,160,166),Array(266))) 
.toDF("value") 

を、私は次の配列と交差する行を選択します:

val goodValue = Array(231, 266) 
val broadcastGood = sc.broadcast(goodValue) 
val containGood = udf((array:scala.collection.mutable.WrappedArray[Int]) => 
broadcastGood.value.intersect(array).size>0) 

そしてIこのUDFを試してみました、

display(temp.filter(containGood(col("value")))) 

私は悪名高いエラーを得た:タスクはSerializableない

奇妙なことは、これが私のためにうまくいったことです。私は何が変わったのかわかりません。本当に助けに感謝します。

編集: 実際には上記のコードだけでOKですが、ブロードキャスト変数は必要ありません。あなたの中には、 "値の1つがSerializableではないScalaクラスの中にある"と言われましたが、これが問題であるはずですが、私はそれを解決する方法がわかりません。

ここに背景情報はあります:私は、コーパス上のトピック分析を実行するために潜在的ディリクレ配分(LDA)を使用しています:

val ldaModel = lda.fit(dfVectorizer) 

dfVectorizerが私の元のデータセットのベクトル化バージョンです。このLDAモデルでは、私は、次のデータセットを生成します。

val topic = ldaModel.describeTopics(50) //with three columns[topic:int, termIndices: array<Int>, termWeights: array<Double>] 
val interestTerms = Seq(1,2,3,4,5,6,7) 
val interestUDF = udf((terms:Seq[Int]) =>terms.filter(r=>interestTerms.contains(r))) 
val topicTmp = topic.withColumn("InterestTerm",interestUDF(col("termIndices"))) 
val sumVec = udf((terms: Seq[Int]) => terms.sum) 
val topicDF = topicTmp.select('topic,sumVec('InterestTerm).as('order)).sort('order.desc) 

だから、最後のデータフレーム「topicDF」は次のようになります。

Topic | Order 

111 | 7 

69 | 7 

248 | 5 

...... 

しかし、私はこのような単純なフィルタを実行しようとしていた場合:

display(topicDF.filter("order>3")) 

エラーは"task not Serializable"になります。エラーメッセージでは、それは「によって引き起こされる」非常に明確にこれをされることを指定し

java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel.

エラーメッセージは次のようになります。

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840) 
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:133) 
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807) 
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791) 
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87) 
at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53) 
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) 
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790) 
at org.apache.spark.sql.Dataset.head(Dataset.scala:2132) 
at org.apache.spark.sql.Dataset.take(Dataset.scala:2345) 
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81) 
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:263) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1$$anonfun$apply$1.apply(ScalaDriverLocal.scala:254) 
at scala.Option.map(Option.scala:145) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:254) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$getResultBuffer$1.apply(ScalaDriverLocal.scala:228) 
at scala.Option.map(Option.scala:145) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal.getResultBuffer(ScalaDriverLocal.scala:228) 
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:209) 
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230) 
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211) 
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168) 
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39) 
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206) 
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39) 
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211) 
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589) 
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589) 
at scala.util.Try$.apply(Try.scala:161) 
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584) 
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488) 
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) 
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348) 
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.mllib.clustering.DistributedLDAModel, value: 
[email protected]) 
- writeObject data (class: scala.collection.mutable.HashMap) 
- object (class scala.collection.mutable.HashMap, Map(lda_1da3e45afeaa__subsamplingRate -> 0.05, lda_1da3e45afeaa__k -> 320, lda_1da3e45afeaa__keepLastCheckpoint -> true, lda_1da3e45afeaa__maxIter -> 100, lda_1da3e45afeaa__optimizer -> em, lda_1da3e45afeaa__optimizeDocConcentration -> true, lda_1da3e45afeaa__learningDecay -> 0.51, lda_1da3e45afeaa__topicConcentration -> 1.1, lda_1da3e45afeaa__learningOffset -> 1024.0, lda_1da3e45afeaa__checkpointInterval -> 10, lda_1da3e45afeaa__featuresCol -> features, lda_1da3e45afeaa__seed -> 12345, lda_1da3e45afeaa__docConcentration -> [[email protected], lda_1da3e45afeaa__topicDistributionCol -> topicDistribution)) 
- field (class: org.apache.spark.ml.param.ParamMap, name: org$apache$spark$ml$param$ParamMap$$map, type: interface scala.collection.mutable.Map) 

はどうもありがとうございました!

+0

どのようなスパークバージョンを使用していますか? – mtoto

+0

このコードは、Spark 2.1を使って私にとってうまく動作します –

+2

定義した値の1つが、SerializableではないScalaクラス内にあるとします。 Scalaファイル全体を提供できますか? –

答えて

0

おそらく、クラス内でこのクラスを使用していて、クラスがマップ処理を行っている場合は、すべてのマップからの戻り値をシリアル化する必要があります。あなたのマッパークラスをシリアライズしていないと仮定しています。

Class XYZ extends serializable 

あなたのUDFを定義しているクラスでこれを試してみてください。

+0

質問の説明にいくつかの情報を追加します。ご覧のとおり、問題はspark mllib DistributedLDAModelです。私はそれを回避する方法がわかりません..ありがとう! –

関連する問題