2016-08-23 8 views
4

2つの新しいパラメータを追加することによってクラスを変更する必要があります。このクラスはKryoとシリアライズされています。 私は現在、ストリームを停止するたびに、このクラスに関連する情報、特にRDDを維持しています。 ストリームを再起動すると、以前に保持した情報をロードし、停止した時点と再起動した時点の整合性を保つために使用します。Kryo:クラスの古いバージョンを非直列化する

クラスIはこれらの新しいパラメータを必要とするため、新しいパラメータとして新しいkryo.writeObject(output, object, ObjectSerializer)kryo.readObject(input, classOf[Object], ObjectSerializer)を追加してクラスとシリアライザを変更しました。

ここで、ストリームを再起動するたびに「Unregistered class ...」という例外が発生します。

ストリームを停止したときに保持していた情報に含まれていないオブジェクトを逆シリアル化しようとすると、明らかです。 これらのデータを削除し、以前の実行がない場合と同様にストリームを開始すると、例外は発生しません。

この例外を回避する手段はありますか? これらのパラメータがない場合のデフォルト値を指定することで

はありがとうございました

EDIT: Kryo issue 194:私は有益な何かを見つけ

私は前を見ていません。

この男は、使用するべきデシリアライザのバージョンを定義するだけで、バージョン管理を実装しました。 これは簡単な解決策ですが、私が開発しているコードを書いた会社は、互換性について考えていなかったので、新しいシリアライザの前に残っていたすべてのデータをウィンドウから取り除かなければならないと思います。

誰かがより良い解決策を提案できるかどうか教えてください。

EDIT 2:

はまだこのような状況で問題が発生しました。 ここで説明したようにCompatibleFieldSerializerを使用しようとしました:CompatibleFieldSerializer Example このシリアライザを登録して、以前使用したカスタムシリアライザを登録しないでください。 結果は、永続データをリロードするときにjava.lang.NullPointerExceptionとなります。 以前のデータが保存されていない場合でも問題はありません。ストリームを開始したり、新しいデータをシリアル化したり、ストリームを停止したり、デシリアライズしたり、ストリームを再開したりすることができます。 解像度についてはまだ手がかりがありません。

答えて

2

問題の解決策は数​​か月前に発見されました。だから私はできるだけ早くこの質問に答えを投稿すると思った。 問題は、コードの間違いのために、クラスが前方互換性のない標準のKryo FieldSerializerでシリアル化されていたという事実にありました。 古いクラスを逆シリアル化して新しい直列化クラスに変換するには、次の操作を実行する必要がありました。

状況でした:すべてのためのすべてのシリアライザを登録するためにループした

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     classA( 
      field1 = input.readLong(), 
      field2 = input.readLong() 
     ) 

とクラスを含む配列は、シリアライザでシリアル化する:

case class ClassA(field1 : Long, field2 : String) 

それは、次のように連載されましたクラス。

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    final def register(kryo: Kryo) = { 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

クラスは、別のケースクラスのインスタンスである新しいフィールドを追加することによって変更する必要がありました。

私たちは、「オプション」Kryoライブラリに関する注釈を使用していた、そのような変更を行うために、

... 
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional 
import scala.annotation.meta.field 
... 

case class ClassA(field1 : Long, field2 : String, @(Optional @field)("field3") field3 : ClassB) 

シリアライザは、このような古いシリアル化されたクラスを読み込むとき、それはしてフィールド3インスタンス化可能性があるので修正しましたkryoシリアライザ登録はまた、オプションのフィールドを登録するように変更された

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
     kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer) 

} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     ClassA( 
      field1 = input.readLong(), 
      field2 = input.readLong(), 
      field3 = ClassB.default 
     ) 

書き込む際には、そのようなデフォルト値を書き込むデフォルト値と、
protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    def optionals = Seq("field3") 

    final def register(kryo: Kryo) = { 
     optionals.foreach { optional => 
     kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) } 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

その結果、新しいバージョンのシリアライズされたクラスを作成することができました。 その後、オプションの注釈を削除し、シリアライザを変更して新しいシリアライズされたクラスから実フィールドを読み込み、オプションのシリアライザ登録を削除してレジストリSeqに追加する必要がありました。

その間、私たちはFieldSerializerによるシリアル化を強制したコードの間違いを修正しましたが、これは質問の範囲にはありません。

関連する問題