コンフルエントなプラットフォーム0.9.0.1とkafka-avro-serializer 2.0.1を使用しています。カフカにイベントを送ろうとして、それらを読んでみると、イベントをJavaオブジェクトに戻す方法はわかりません。私はavroとコンフルエントなドキュメントを読んできましたが、これが実行可能であるというヒントがありますが、良い例を見つけることはできません。私のコードは、私がKafkaConsumerで読むときにGenericData $ Recordを返します。私の質問は、それをJava pojoに戻す方法です。私はこのオブジェクトをシリアル化するために使用したコードのbitを見つけました。ここで カフカを使用するときにJavaをAvroに変換して戻す
は私のコードです:あなたは、単純な手動POJO取り込むことができない理由import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.Properties;
/**
* This is a test...
*/
public class KafkaAvroProducerTest {
private static final Logger log = LogManager.getLogger(KafkaAvroProducerTest.class);
@Test
public void produceAndSendAndEvent() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);
log.debug("starting producer");
String topic = "topic11";
Schema schema = ReflectData.get().getSchema(Purchase.class);
Purchase purchase = new Purchase("appStore", 9.99d, DateTime.now().getMillis(), "BRXh2lf9wm");
ReflectDatumWriter<Purchase> reflectDatumWriter = new ReflectDatumWriter<>(schema);
GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
reflectDatumWriter.write(purchase, EncoderFactory.get().directBinaryEncoder(bytes, null));
GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
ProducerRecord record = new ProducerRecord<Object, Object>(topic, avroRecord);
Thread producerThread = new Thread(() -> {
try {
while(true) {
log.debug("send a message {}", record);
producer.send(record);
Thread.sleep(2000);
}
}catch(Exception ex) {
log.error("error", ex);
}
});
producerThread.start();
props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup");
props.put("auto.commit.enable", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
org.apache.kafka.clients.consumer.KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
Thread consumerThread = new Thread(() -> {
try {
while(true) {
try {
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, GenericRecord> record1 : records) {//
log.debug("read - {}", record1.value().getClass());
}
}catch(Exception ex) {
log.error("error", ex);
}
}
}catch(Exception ex) {
log.error("error", ex);
}
});
consumerThread.start();
System.in.read();
}
}
ありがとうございます!おそらくうまくいくでしょう。私は、avroがjsonのようなものだと誤って思っていたと思います。あなたは単にpojoをシリアライズしてデシリアライズすることができます。しかし、私が見るすべてのavroの例は、(特定のSpecificRecordBaseのサブクラスである)クラスを生成するためにmavenプラグインと共にスキーマを使用するか、GenericRecordを使用して、表示するようにフィールドを手動で設定します。私はシリアル化するために自分のオブジェクトを使用することを望んでいましたが、不可能であるようです。おそらく、スキーマとコードの生成ルートがあります。互換性を維持するのに役立ちます(クラスの変更に合わせてスキーマを変更できます)。 –
各メッセージを変換する実行オーバーヘッドはどのくらいですか?それは二度起こっていないのですか? – aasthetic
私が従うことができるかどうかわかりません。これは 'GenericRecord'からPOJO型への変換です。あなたは何が二度起こると思いますか? –