2017-05-22 8 views
0

次のコードには「タスクはシリアル化できません」というエラーがありますか?Sparkアプリケーションで「タスクがシリアル化できません」というエラーが発生しましたか?

エラー

 
Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) 
     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 
     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.map(RDD.scala:369) 
     at ConnTest$.main(main.scala:41) 
     at ConnTest.main(main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: DoWork 
Serialization stack: 
     - object not serializable (class: DoWork, value: [email protected]) 
     - field (class: ConnTest$$anonfun$2, name: doWork$1, type: class DoWork) 
     - object (class ConnTest$$anonfun$2,) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
     ... 20 more 

コード:

object ConnTest extends App { 
    override def main(args: scala.Array[String]): Unit = { 
    super.main(args) 
    val date = args(0) 
    val conf = new SparkConf() 
    val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val jdbcSqlConn = "jdbc:sqlserver://......;" 

    val listJob = new ItemListJob(sqlContext, jdbcSqlConn) 
    val list = listJob.run(date).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect() 
    // It returns about 3000 rows 

    val doWork = new DoWork(sqlContext, jdbcSqlConn) 
    val processed = sc.parallelize(list).map(d => { 
     doWork.run(d, date) 
    }) 
    } 
} 

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) { 
    def run(date: LocalDate) = { 
    sqlContext.read.format("jdbc").options(Map(
     "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
     "url" -> jdbcSqlConn, 
     "dbtable" -> s"dbo.GetList('$date')" 
    )).load() 
    } 
} 

class DoWork(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) { 
    def run(id: Int, date: LocalDate) = { 
    // ...... read the data from database for id, and create a text file 
    val data = sqlContext.read.format("jdbc").options(Map(
     "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
     "url" -> jdbcSqlConn, 
     "dbtable" -> s"someFunction('$id', $date)" 
    )).load() 
    // .... create a text file with content of data 
    (id, date) 
    } 
} 

更新:

は、私は、

次のように .map()コールを変更
val processed = sc.parallelize(dealList).toDF.map(d => { 
    doWork.run(d(0).asInstanceOf[Int], rc) 
}) 

は、今私は

 
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate 
- field (class: "java.time.LocalDate", name: "_2") 
- root class: "scala.Tuple2" 
     at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) 
     at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) 
     at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
     at scala.collection.immutable.List.foreach(List.scala:381) 

答えて

3

のエラーを得た問題で、次の閉鎖にあります、

val processed = sc.parallelize(list).map(d => { 
    doWork.run(d, date) 
}) 

mapで閉鎖が執行に実行されるスパークはdoWorkをシリアル化する必要がありますようにエグゼクティブにそれを送ってください。 DoWorkはシリアル化可能でなければなりません。しかしながら。 DoWorkにはscsqlContextが含まれているので、DoWorkSerializableを実装することはできません。エグゼキュータでは使用できないためです。

おそらく、データベースにデータを保存する場合は、DoWorkとしてください。その場合は、データフレームにRDDを変換し、jdbcメソッドを介してそれを保存することができ、例えば:

sc.parallelize(list).toDF.write.jdbc(...) 

私はあなたがDoWorkにコードを提供していないので、より多くの示唆を与えることはできません。

+0

ありがとうございました。 'DoWork.run()'関数はデータベースからデータを読み込んでテキストファイルを生成します。私はまだ関数本体のコードを書いていません。 – ca9163d9

+0

私は2つのクラスに対して 'sc:SparkContext'は必要ないことに気付きました。私は質問を更新しました。しかし、私はまだデータベースを読むためにsqlContextが必要でしょうか? – ca9163d9

+0

私のケースでは、データフレームのヘルプですか?あるいは、私は寄木細工のファイルをあらかじめ作成してSpark-SQLを使ってデータを取得し、テキストファイルを作成しなければなりませんでしたか? – ca9163d9

関連する問題