2016-10-26 9 views
15

免責事項:Sparkで再生を開始したばかりです。Sparkの終了とそのシリアル化の理解

私は有名な「タスクはシリアライズできない」例外を理解するのに苦労していますが、私の質問はSOに見られるものとは少し異なります。

私は小さなカスタムRDD(TestRDD)を持っています。クラスにSerializable(NonSerializable)を実装していないオブジェクトを格納するフィールドがあります。私はKryoを使うために "spark.serializer"設定オプションを設定しました。私はそれがJavaシリアライザ、ないKryoである私のRDD、上の閉鎖シリアライザを使用していることがわかり内側DAGScheduler.submitMissingTasks見ると

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable 
Serialization stack: 
- object not serializable (class: com.test.spark.NonSerializable, value: [email protected]) 
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable) 
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28) 
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>)) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933) 

:私は私のRDDにcount()をしようとすると、しかし、私は次の取得します私が期待しているシリアライザ。私はKryoにクロージャのシリアライズに関する問題があり、Sparkはクロージャ用のJavaシリアライザを常に使用していますが、ここでクロージャがどのように動作するのかはよく分かりません。 、何のマッパーやクロージャのシリアル化が必要なことは何もありません

SparkConf conf = new SparkConf() 
         .setAppName("ScanTest") 
         .setMaster("local") 
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

JavaSparkContext sc = new JavaSparkContext(conf); 

TestRDD rdd = new TestRDD(sc.sc()); 
System.err.println(rdd.count()); 

:私はここでやっているすべてはこれです。大藤これは動作します:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count() 

Kryoシリアライザは予想通り、閉鎖シリアライザが関与していない使用されています。シリアライザプロパティをKryoに設定しなかった場合は、ここでも例外が発生します。

私は、クロージャがどこから来たのかを説明するポインタと、Kryoを使ってカスタムRDDをシリアル化する方法を説明してくれてありがとうと思います。

UPDATE:私はDAGScheduler.submitMissingTasks中を見たとき、私はそれがJavaのシリアライザである私のRDDの その閉鎖シリアライザを使用していることがわかり

class TestRDD extends RDD<String> { 

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class); 

    NonSerializable mNS = new NonSerializable(); 

    public TestRDD(final SparkContext _sc) { 
     super(_sc, 
       JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()), 
       STRING_TAG); 
    } 

    @Override 
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) { 
     return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(), 
                    "test_" + thePartition.index(), 
                    "test_" + thePartition.index()).iterator()).asScala(); 
    } 

    @Override 
    public Partition[] getPartitions() { 
     return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)}; 
    } 

    static class TestPartition implements Partition { 

     final int mIndex; 

     public TestPartition(final int theIndex) { 
      mIndex = theIndex; 
     } 

     public int index() { 
      return mIndex; 
     } 
    } 
} 
+0

あなたの 'TestRDD'に' SparkContext'を保持するフィールドがありますか? 'TestRDD'の定義を表示するか、そこに[MCVE] –

+0

@ YuvalItzchakovを作成してください。 'SparkContext'がsuperのコンストラクタに渡されるので、RDDはそれを保持します。例外はそれについて不平を言っていないようです。 –

+0

'NonSerializable'を投稿できますか? –

答えて

7

:ここTestRDDは、その非直列化可能フィールドmNSでです、私が期待しているのは、 のKryoシリアライザではありません。

SparkEnv 2つのシリアライザ、あなたのデータのシリアル化のために使用されているserializerという名前の1、チェックポイント、労働者間のメッセージングなどをサポートし、spark.serializer設定フラグで利用することができます。もう1つはclosureSerializerspark.closure.serializerと呼ばれ、オブジェクトが実際にシリアル化可能であり、Spark < = 1.6.2(ただし、JavaSerializerは実際に動作しません)の設定が可能で、2.0.0以上からJavaSerializerにハードコードされていることを確認します。

Kryo閉鎖シリアライザは、それが使用できなくなるバグを持っている、あなたは(これはKryo 3.0.0で固定してもよいが、スパークが現在Kryoに固定されているチルの特定のバージョンに固定されているSPARK-7708の下でそのバグを見ることができます2.2.1)。さらに、Spark 2.0.xの場合、JavaSerializerは設定可能ではなく固定されています(これはin this pull requestです)。つまり、効果的にクロージャのシリアライゼーションにはJavaSerializerが付いています。

私たちは、シリアライザを使用して作業を送信したり、作業者などの間でデータをシリアル化したりするのは奇妙ですか?間違いなく、これは私たちが持っているものです。

spark.serializer設定を設定している場合、またはSparkContext.registerKryoClassesを使用している場合は、Sparkのほとんどのシリアル化にKryoを利用することになります。ある特定のクラスがシリアライズ可能かどうかを確認するために、と作業者の作業をシリアル化すると、SparkはJavaSerializerを使用します。

+0

ありがとうございます、どうやって間違っていますか? 'DAGScheduler'は' serializer'フィールドではなく 'closureSerializer'フィールドを使用しています。私がKryoを使用する環境を設定しても、SparkEnv.get.closureSerializerはJavaのシリアライザ(iirc、それは無視されていたため、2.0のうち 'spark.closure.serializer'を引っ張ってしまった)それが失敗する理由を見てください。質問は違う:私のケースでスケジューラがクロージャシリアライザを使うのはなぜですか?私のRDD> –

+0

hmにKryoを使用するにはどうすればいいですか?この文は、Spark 2.0.0と2.0.1(スタックトレースから明らかです)には正しいと思います。 SPARK-12414をチェックすることもできます。 'closureSerializer'は抽象型であるかもしれませんが、AFAICTは1つの実装のみが使用されます。 –

+0

@PavelKlinovあなたが正しいです。私は少し深く掘った、私の更新を参照してください。 –

関連する問題