0

AvroシリアライザとスキーマレジストリでKafkaにオブジェクトを送信しようとしています。ここ
は単純化されたコードです:AvroシリアライザとスキーマレジストリでKafkaにメッセージを送信する方法

Properties props = new Properties(); 
    ... 
    props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); 
    props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081"); 

    Producer<String, User> producer = new KafkaProducer(properties); 

    User user = new User("name", "address", 123); 
    ProducerRecord record = new ProducerRecord<>(topic, key, user); 
    producer.send(record); 

私はレジストリやオブジェクト(ユーザー)からのスキーマは、「舞台裏」読まれることを想定シリアライズさですが、私は以下のエラーを取得します。
私は何が欠けていますか?
スキーマを明示的に読み取り、GenericRecordを送信する必要がありますか?

org.apache.kafka.common.errors.SerializationException:エラーシリアル化によって引き起こさアブロメッセージ
:java.lang.IllegalArgumentExceptionが:サポートされていないアブロタイプ。サポートされているタイプは、io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:123)〜[kafka-avro-serializer]のnull、Boolean、Integer、Long、Float、Double、String、byte []およびIndexedRecord
です。 -3.3.0.jar!/ :?
io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73)〜[kafka-avro-serializer-3.3.0.jar!/ :?]
io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)〜[kafka-avro-serializer-3.3.0.jar!/ :?]
at org.apache.kafka.clients。 producer.KafkaProducer.send(KafkaProducer.java:424)〜[kafka-clients-0.9.0.1.jar!/ :?]

答えて

1

コードが正しいようです。欠けている可能性があるのは、AVROオブジェクトがAVROプラグインで正しく生成されていないことです。つまり、クラスがSpecificRecordsを実装して、IndexedRecordを実装する必要があることを意味します。

+0

右。私は適切なavscファイルを作成し、それから、gradle-avro-pluginを使ってJavaファイルを生成した後、すべてがうまくいくように見えます。私は、スキーマがAvroによって自動的に作成されたと仮定しました。反射を使用して、私は間違っていました。 – msayag

関連する問題