私はキーと値の両方のデータのシリアル化にKafka 0.10.2とAvroを使用しています。 今私はKafkaストリームを使用したいと思いますが、GenericData.Record
クラスのSerde
クラスを作成しようとしています。GenericData.Record用のKafkaAvro Serdeの書き方
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
これは私が持つことができるので、私は(とない私の具体的なPOJOのための)GenericData.Record
ためSerdeクラスを定義する必要がありコメント行
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
で取得していますエラーですデシリアライザは私にGenericData
を返さなければなりません(そして、私はこのステップの後に正しいPOJOを設定します)。
どうすれば完了ですか? ありがとうございました