AvroシリアライザとスキーマレジストリでKafkaにオブジェクトを送信しようとしています。ここ
は単純化されたコードです:AvroシリアライザとスキーマレジストリでKafkaにメッセージを送信する方法
Properties props = new Properties();
...
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081");
Producer<String, User> producer = new KafkaProducer(properties);
User user = new User("name", "address", 123);
ProducerRecord record = new ProducerRecord<>(topic, key, user);
producer.send(record);
私はレジストリやオブジェクト(ユーザー)からのスキーマは、「舞台裏」読まれることを想定シリアライズさですが、私は以下のエラーを取得します。
私は何が欠けていますか?
スキーマを明示的に読み取り、GenericRecordを送信する必要がありますか?
org.apache.kafka.common.errors.SerializationException:エラーシリアル化によって引き起こさアブロメッセージ
:java.lang.IllegalArgumentExceptionが:サポートされていないアブロタイプ。サポートされているタイプは、io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:123)〜[kafka-avro-serializer]のnull、Boolean、Integer、Long、Float、Double、String、byte []およびIndexedRecord
です。 -3.3.0.jar!/ :?
io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73)〜[kafka-avro-serializer-3.3.0.jar!/ :?]
io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)〜[kafka-avro-serializer-3.3.0.jar!/ :?]
at org.apache.kafka.clients。 producer.KafkaProducer.send(KafkaProducer.java:424)〜[kafka-clients-0.9.0.1.jar!/ :?]
右。私は適切なavscファイルを作成し、それから、gradle-avro-pluginを使ってJavaファイルを生成した後、すべてがうまくいくように見えます。私は、スキーマがAvroによって自動的に作成されたと仮定しました。反射を使用して、私は間違っていました。 – msayag