2016-03-22 11 views
8

私は、read()write()のメソッドをcom.esotericsoftware.kryo.Serializer(以下の例を参照)から実装することで、カスタムKryoシリアライザを実装するクラスを持っています。 このカスタムシリアライザはどのようにSparkに登録できますか?ここでSpark Kryo:カスタムシリアライザを登録する

は、私が持っているもののsudoのコードの例です:

スパークで今
class A() 

CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{ 
    override def write(kryo: Kryo, output: Output, a: A): Unit = ??? 
    override def read(kryo: Kryo, input: Input, t: Class[A]): A = ??? 
} 

val kryo: Kryo = ... 
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer 

val sparkConf = new SparkConf() 
sparkConf.registerKryoClasses(Array(classOf[A])) 

残念ながら、スパークは私に私のカスタム・シリアライザを登録するためのオプションを与えるものではありません。これを行う方法があれば、どんな考えですか?

+0

spark.kryo.classesToRegister – Sohaib

+1

を見[この回答](http://stackoverflow.com/questions/32667068/save-spark-dataframe-into-elasticsearch-can-t-handle-type -exception)はあなたの質問に対する直接的な答えではありませんが、提供された説明はsparkでのカスタムシリアライザ登録の詳細を示します。 – eliasah

答えて

11

登録このカスタム・シリアライザとKryoRegistrator独自に作成します。例えば、次に

package com.acme 

class MyRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) { 
    kryo.register(classOf[A], new CustomASerializer()) 
    } 
} 

あなたのレジの完全修飾名にspark.kryo.registratorを設定com.acme.MyRegistrator

val conf = new SparkConf() 
conf.set("spark.kryo.registrator", "com.acme.KryoRegistrator") 
+0

これは問題を解決します。ありがとう! – marios

+1

これはsparkのドキュメントではあまり明確ではありませんが、これは絶対に機能します。 KryoがSparkで引数なしのコンストラクタを使ってクラスを直列化できないという問題に遭遇した場合(これはorg.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering)、これで問題を解決できますkryo.register(LazilyGeneratedOrdering.class、new JavaSerializer())を使用します。ありがとうございました! – jhnclvr

関連する問題