2016-12-30 9 views
2

スパークストリーミングでKryoシリアライザを使用しようとしています。私はそのSpark tuning docsで読む - あなたのカスタムクラスを登録しない場合

最後に、Kryoはまだ 動作しますが、それは無駄である 、各オブジェクトとの完全なクラス名を保存する必要があります。

私はすべてのクラスを登録しようとしています。

trait Message extends java.io.Serializable 

object MutableTypes { 
    type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)] 
    type Parents = scala.collection.mutable.Map[Int, Childs] 
} 

case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message 

そして、私はこのようなクラスを登録しています - - 私の場合クラスがある

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    .set("spark.kryo.registrationRequired","true") 
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord])) 

が、私はこの例外だ:

com.esotericsoftware.kryo.KryoException: れるjava.langを.IllegalArgumentException:クラスが登録されていません: scala.Tuple2 $ mcJZ $ sp注:このクラスを登録するには、次のようにします: kryo.register(scala.Tuple2 $ mcJZ $ sp.class);シリアル化トレース: com.esotericsoftware.kryo.serializers.FieldSerializer.writeで com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:585) (FieldSerializerで parents_to_add(com.test.IncomingRecord) .javaファイル:213) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scalaでcom.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
で :194) 組織で。 (DiskBlockObjectWriter.scala:185) でorg.apache.spark.util.collection.WritablePartitionedPairCollection $$アノン$ 1.writeNext(WritablePartitionedPairCollection.scala:56) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)で でorg.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) でorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) org.apacheました。 spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) (org.apache.spark.scheduler.Task.run(Task.scala:89)) org.apache.spark.executor.Executor $ TaskRunner.run Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread。 java:745)

クラスはどのように登録できますか?これを解決するには?

アップデート:私は例外を削除します偽registerationを回して知っているが、それが原因余分なオーバーヘッドにそのくらいのパフォーマンスを追加しません

。私はどのように私のクラスを登録することができます知りたい。

+0

ない 'classOf [Tuple2 [ロングは、ブール]]'の専門を選択してくださいクラス。あなたはその表現の警告を受け取りませんか?たぶん、特殊化せずに 'case class'で置き換えることができます。 –

+0

警告が表示されない –

答えて

0

あなたのカスタムクラスを登録しない場合は最後に、Kryoはまだ 動作しますが、それは無駄である 、各オブジェクトとの完全なクラス名を保存する必要があります。 spark.kryo.registrationRequiredのデフォルト値を使用している場合にのみ これは(偽である)」真である

例外の問題を解決する必要があり、次の(またはこのパラメータに任意の値を設定することを回避しているデフォルト値を使用して偽)

.set("spark.kryo.registrationRequired","false") 

詳細情報はここで見つけることができます:http://spark.apache.org/docs/latest/configuration.html

はKryoの登録を要求するかどうか(デフォルト値)偽spark.kryo.registrationRequired '真'、クリプトンに設定されている場合。登録されていないクラスがシリアライズされた場合、例外がスローされます。 False(デフォルト)に設定されている場合、Kryoは登録されていないクラス名を各オブジェクトと共に書き込みます。クラス名を書くと、パフォーマンス上のオーバーヘッドが大きくなる可能性があるため、このオプションを有効にすると、ユーザーがクラスを登録から除外していないことが厳密に強制できます。

いくつかのポイント - kryoシリアライズを登録する方法:

+0

パフォーマンスを向上させて、すべてのクラスを登録したいと考えています。 –

+0

@NishantKumar - フラグを "false"に設定すると例外が取り除かれます。カイロのシリアル化に役立ついくつかのリンクを追加しました。 – Yaron

+0

Sparkストリーミングウェブサイトの引用文を投稿に追加しました。私は誤って例外が削除されることは知っていますが、それは公式文書によれば無駄になり、オーバーヘッドのためにパフォーマンスが向上しません。私はどのように私はすべてのクラスを登録することができます知りたい。 –

関連する問題