スパークストリーミングでKryoシリアライザを使用しようとしています。私はそのSpark tuning docsで読む - あなたのカスタムクラスを登録しない場合
最後に、Kryoはまだ 動作しますが、それは無駄である 、各オブジェクトとの完全なクラス名を保存する必要があります。
私はすべてのクラスを登録しようとしています。
trait Message extends java.io.Serializable
object MutableTypes {
type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
type Parents = scala.collection.mutable.Map[Int, Childs]
}
case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message
そして、私はこのようなクラスを登録しています - - 私の場合クラスがある
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))
が、私はこの例外だ:
com.esotericsoftware.kryo.KryoException: れるjava.langを.IllegalArgumentException:クラスが登録されていません: scala.Tuple2 $ mcJZ $ sp注:このクラスを登録するには、次のようにします: kryo.register(scala.Tuple2 $ mcJZ $ sp.class);シリアル化トレース: com.esotericsoftware.kryo.serializers.FieldSerializer.writeで com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:585) (FieldSerializerで parents_to_add(com.test.IncomingRecord) .javaファイル:213) org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scalaでcom.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
で :194) 組織で。 (DiskBlockObjectWriter.scala:185) でorg.apache.spark.util.collection.WritablePartitionedPairCollection $$アノン$ 1.writeNext(WritablePartitionedPairCollection.scala:56) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)で でorg.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) でorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) org.apacheました。 spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) (org.apache.spark.scheduler.Task.run(Task.scala:89)) org.apache.spark.executor.Executor $ TaskRunner.run Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread。 java:745)
クラスはどのように登録できますか?これを解決するには?
アップデート:私は例外を削除します偽registerationを回して知っているが、それが原因余分なオーバーヘッドにそのくらいのパフォーマンスを追加しません
。私はどのように私のクラスを登録することができます知りたい。
ない 'classOf [Tuple2 [ロングは、ブール]]'の専門を選択してくださいクラス。あなたはその表現の警告を受け取りませんか?たぶん、特殊化せずに 'case class'で置き換えることができます。 –
警告が表示されない –