2016-12-04 7 views
0

スパムRDD操作にラムダが渡されたとき、その範囲外のオブジェクトを参照する場合、分散実行用のシリアル化タスクを作成するのに必要なコンテキストが含まれます。以下の簡単な例では、なぜ乗数だけではなく、OutClassインスタンス全体を直列化することにしましたか?私は乗数が実際にはScala getterメソッドであることを疑っていたので、クラスの参照を含める必要があります。宣言OuterClassはSerializableを拡張しますが、不要な制約が導入されます。私は、OuterClassをシリアライズ可能と宣言することなく動作させる方法を本当に感謝しています。ここで スパークタスクのシリアライゼーションとクロージャ

object ClosureTest { 
    def main(args: Array[String]): Unit = { 
    val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[2]").setAppName("test")) 
    println(new OuterClass(10).sparkSumProd(sc.parallelize(Seq(1,2,3)))) 
    } 
    class OuterClass(multiplier: Int) { 
    def sparkSumProd(data: RDD[Int]): Double = { 
     data.map{ 
     v => v * multiplier 
     }.sum() 
    } 
    } 
} 

はちょうどそれが動作しますローカル変数にクラスレベルの変数を割り当てるスパークのSerializationDebugger

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2056) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:365) 
    at ClosureTest$OuterClass.sparkSumProd(ClosureTest.scala:14) 
    at ClosureTest$.main(ClosureTest.scala:10) 
    at ClosureTest.main(ClosureTest.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 
Caused by: java.io.NotSerializableException: ClosureTest$OuterClass 
Serialization stack: 
    - object not serializable (class: ClosureTest$OuterClass, value: [email protected]) 
    - field (class: ClosureTest$OuterClass$$anonfun$sparkSumProd$1, name: $outer, type: class ClosureTest$OuterClass) 
    - object (class ClosureTest$OuterClass$$anonfun$sparkSumProd$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
    ... 17 more 

答えて

0

から出力されます。

object ClosureTest { 
    def main(args: Array[String]): Unit = { 
    val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[2]").setAppName("test")) 
    println(new OuterClass(10).sparkSumProd(sc.parallelize(Seq(1,2,3)))) 
    } 
    class OuterClass(multiplier: Int) { 
    def sparkSumProd(data: RDD[Int]): Double = { 
     val m = multiplier 
     data.map{ 
     v => v * m 
     }.sum() 
    } 
    } 
} 
関連する問題