0
Kafka Avroシリアライザとデシリアライザは動作していません。私はkafkaコンソールのコンシューマーを使ってメッセージを消費しようとしました。メッセージが公開されているのがわかりました。Avroシリアライザとデシリアライザkafka java api
public class AvroProducer<T> {
private static Properties props;
static {
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
}
private static KafkaProducer producer = new KafkaProducer<>(props);
public byte[] createRecords(T pojo) throws IOException{
Schema.Parser parser = new Schema.Parser();
Schema schema = null;
try {
schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
} catch (IOException e) {
System.out.println(e.getLocalizedMessage());
}
final GenericData.Record record = new GenericData.Record(schema);
schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(pojo).getPropertyValue(r.name())));
SpecificDatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
try(ByteArrayOutputStream os = new ByteArrayOutputStream()){
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(record, encoder);
encoder.flush();
byte[] bytemessage = os.toByteArray();
return bytemessage;
}
}
public static void sendMessage(byte[] bytemessage){
ProducerRecord precord = new ProducerRecord<StringSerializer, byte[]>("jason", bytemessage);
producer.send(precord);
}
}
public class AvroConsumer {
private static Properties kafkaProps;
static {
kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne");
}
@SuppressWarnings("unchecked")
public static void recieveRecord() throws IOException{
try (KafkaConsumer<String,byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
kafkaConsumer.subscribe(Arrays.asList("jason"));
while (true) {
ConsumerRecords<String,byte[]> records = kafkaConsumer.poll(100);
Schema.Parser parser = new Schema.Parser();
final Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
records.forEach(record -> {
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema);
ByteArrayInputStream is = new ByteArrayInputStream(record.value());
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null);
try {
Syslog log = (Syslog) datumReader.read(null, binaryDecoder);
System.out.println("Value: " + log);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
}
スタックトレースは以下のとおりです。詳細はこちらをご覧ください。誰かが正しい実装で私を導くことができますか?問題は、値にアクセスするためのキャストrecord.howにあるようです。特定のデータムリーダーを使用してデータを読み取る方法。
Exception in thread "Thread-1" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.cisco.daas.kafka.Syslog
at com.cisco.daas.kafka.AvroConsumer.lambda$0(AvroConsumer.java:46)
at java.lang.Iterable.forEach(Unknown Source)
at com.cisco.daas.kafka.AvroConsumer.recieveRecord(AvroConsumer.java:41)
at com.cisco.daas.kafka.MainApp$1.run(MainApp.java:32)
at java.lang.Thread.run(Unknown Source)
これは私がこの問題は、キーを使用することによって解決される
{
"namespace": "com.example.syslogmessage",
"type": "record",
"name": "SysLogMessage",
"fields": [{
"name": "partyID",
"type": "long"
},
{
"name": "applianceID",
"type": "string"
},
{
"name": "message",
"type": "string"
},
{
"name": "severity",
"type": "long"
},
{
"name": "messageType",
"type": "string"
},
{
"name": "eventtime",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "ipaddress",
"type": "string"
},
{
"name": "hostname",
"type": "string"
}
]
}