免責事項:Sparkで再生を開始したばかりです。Sparkの終了とそのシリアル化の理解
私は有名な「タスクはシリアライズできない」例外を理解するのに苦労していますが、私の質問はSOに見られるものとは少し異なります。
私は小さなカスタムRDD(TestRDD
)を持っています。クラスにSerializable(NonSerializable
)を実装していないオブジェクトを格納するフィールドがあります。私はKryoを使うために "spark.serializer"設定オプションを設定しました。私はそれがJavaシリアライザ、ないKryoである私のRDD、上の閉鎖シリアライザを使用していることがわかり内側DAGScheduler.submitMissingTasks
見ると
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: [email protected])
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
:私は私のRDDにcount()
をしようとすると、しかし、私は次の取得します私が期待しているシリアライザ。私はKryoにクロージャのシリアライズに関する問題があり、Sparkはクロージャ用のJavaシリアライザを常に使用していますが、ここでクロージャがどのように動作するのかはよく分かりません。 、何のマッパーやクロージャのシリアル化が必要なことは何もありません
SparkConf conf = new SparkConf()
.setAppName("ScanTest")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
:私はここでやっているすべてはこれです。大藤これは動作します:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
Kryoシリアライザは予想通り、閉鎖シリアライザが関与していない使用されています。シリアライザプロパティをKryoに設定しなかった場合は、ここでも例外が発生します。
私は、クロージャがどこから来たのかを説明するポインタと、Kryoを使ってカスタムRDDをシリアル化する方法を説明してくれてありがとうと思います。
UPDATE:私はDAGScheduler.submitMissingTasks
中を見たとき、私はそれがJavaのシリアライザである私のRDDの その閉鎖シリアライザを使用していることがわかり
class TestRDD extends RDD<String> {
private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
NonSerializable mNS = new NonSerializable();
public TestRDD(final SparkContext _sc) {
super(_sc,
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
STRING_TAG);
}
@Override
public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
"test_" + thePartition.index(),
"test_" + thePartition.index()).iterator()).asScala();
}
@Override
public Partition[] getPartitions() {
return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
}
static class TestPartition implements Partition {
final int mIndex;
public TestPartition(final int theIndex) {
mIndex = theIndex;
}
public int index() {
return mIndex;
}
}
}
あなたの 'TestRDD'に' SparkContext'を保持するフィールドがありますか? 'TestRDD'の定義を表示するか、そこに[MCVE] –
@ YuvalItzchakovを作成してください。 'SparkContext'がsuperのコンストラクタに渡されるので、RDDはそれを保持します。例外はそれについて不平を言っていないようです。 –
'NonSerializable'を投稿できますか? –