2017-01-13 8 views
2

これは不適切な形式が使用されている場合、SOと私の謝罪に関する私の最初の投稿です。Spark java.io.NotSerializableExceptionの理解

私はApache Sparkを使って新しいソース(DefaultSource経由)、BaseRelationsなどを作成しています。シリアライズの問題にぶつかって、よく理解したいと思います。以下では、BaseRelationを拡張し、スキャンビルダを実装するクラスを検討します。

class RootTableScan(path: String, treeName: String)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan{ 


    private val att: core.SRType = 
    { 
     val reader = new RootFileReader(new java.io.File(Seq(path) head)) 
     val tmp = 
     if (treeName==null) 
      buildATT(findTree(reader.getTopDir), arrangeStreamers(reader), null) 
     else 
      buildATT(reader.getKey(treeName).getObject.asInstanceOf[TTree], 
      arrangeStreamers(reader), null) 
     tmp 
    } 

    // define the schema from the AST 
    def schema: StructType = { 
     val s = buildSparkSchema(att) 
     s 
    } 

    // builds a scan 
    def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { 

     // parallelize over all the files 
     val r = sqlContext.sparkContext.parallelize(Seq(path), 1). 
     flatMap({fileName => 
      val reader = new RootFileReader(new java.io.File(fileName)) 
      // get the TTree 
      /* PROBLEM !!! */ 
      val rootTree = 
//   findTree(reader) 
      if (treeName == null) findTree(reader) 
      else reader.getKey(treeName).getObject.asInstanceOf[TTree] 
      new RootTreeIterator(rootTree, arrangeStreamers(reader), 
      requiredColumns, filters) 
     }) 

     println("Done building Scan") 
     r 
    } 
    } 
} 

問題がどこで発生するのかを特定します。 treeNameは、コンストラクタを介してクラスに注入されるvalです。それを使用するラムダはスレーブ上で実行されるはずです。私はtreeNameを送信する必要があります - それをシリアル化します。私は以下のコードスニペットがなぜこのNotSerializableExceptionを引き起こすのかを理解したいと思います。その中にTREENAMEせず、それは以下

val rootTree = 
     // findTree(reader) 
     if (treeName == null) findTree(reader) 
     else reader.getKey(treeName).getObject.asInstanceOf[TTree] 

が、私はそれが私のラムダをシリアル化しようとすることができますことを推測することができると思いスタックからスタックトレース

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2056) 
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:375) 
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:374) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.flatMap(RDD.scala:374) 
    at org.dianahep.sparkroot.package$RootTableScan.buildScan(sparkroot.scala:95) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$8.apply(DataSourceStrategy.scala:260) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$8.apply(DataSourceStrategy.scala:260) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:303) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:302) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:379) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:298) 
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:256) 
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60) 
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61) 
    at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47) 
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51) 
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298) 
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48) 
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 50 elided 
Caused by: java.io.NotSerializableException: org.dianahep.sparkroot.package$RootTableScan 
Serialization stack: 
    - object not serializable (class: org.dianahep.sparkroot.package$RootTableScan, value: [email protected]) 
    - field (class: org.dianahep.sparkroot.package$RootTableScan$$anonfun$1, name: $outer, type: class org.dianahep.sparkroot.package$RootTableScan) 
    - object (class org.dianahep.sparkroot.package$RootTableScan$$anonfun$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 

あるだけで正常に動作することを私は確かに知っていますない。このラムダは、ラムダスコープの外側で定義されたvalを持つので、クロージャでなければなりません。しかし、なぜこれをシリアル化できないのか分かりません。

本当にありがとうございます! ありがとう!

+0

「findTree」はどこに定義されていますか? – Tim

+0

'DEF findTree(DIR:TDirectory):TTreeは= //ツリー {(iは< - dir.nKeysまで0)を見つける{ ヴァルOBJ = dir.getKey(I).getObject.asInstanceOf [AbstractRootObject] をIF(obj.getRootClass.getClassName == "TDirectory" || obj.getRootClass.getClassName == "TTree") {IF(obj.getRootClass.getClassName == "TDirectory") リターンfindTree(obj.asInstanceOf [ TDirectory])他の (obj.getRootClass.getClassName == "TTree") リターンobj.asInstanceOf [TTree] }} ヌル } ' –

+0

ああ、私は参照してください。 – Tim

答えて

1

スカラクロージャがtreeNameのようなクラス変数を参照すると、JVMはクロージャとともに親クラスを直列化します。あなたのクラスRootTableScanはシリアライズ可能ではありません!解決策はローカル文字列変数を作成することです:

// builds a scan 
    def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { 

     val localTreeName = treeName // this is safe to serialize 

     // parallelize over all the files 
     val r = sqlContext.sparkContext.parallelize(Seq(path), 1). 
     flatMap({fileName => 
      val reader = new RootFileReader(new java.io.File(fileName)) 
      // get the TTree 
      /* PROBLEM !!! */ 
      val rootTree = 
//   findTree(reader) 
      if (localTreeName == null) findTree(reader) 
      else reader.getKey(localTreeName).getObject.asInstanceOf[TTree] 
      new RootTreeIterator(rootTree, arrangeStreamers(reader), 
      requiredColumns, filters) 
     }) 
+0

ありがとうTim!これはすべてを非常にはっきりと説明しています! –

関連する問題