6
私はKafkaとavroにとって全く新しいので、コンフルエントなパッケージを使用しようとしています。私たちはJPAのために使用する既存のPOJOを持っており、各値を手動で汎用レコードに反映させることなくPOJOのインスタンスを作成することができます。私はこれがドキュメントでどのように行われているのか分からないようです。pojosをconfluent.ioの汎用レコードに変換してKafkaProducerを送信する
例は、一般的なレコードを使用し、そのように1ずつ値1を設定します。
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
ありクラスからスキーマを取得するためのいくつかの例があると私は注釈が必要なように、そのスキーマを変更することが判明しました。今私はPOJOのインスタンスを取ってそのままシリアライザに送り、クラスのスキーマをマッチングして値を汎用レコードにコピーする作業をライブラリに持たせますか?私はこれについてすべて間違っているのですか?私がやりたいことは次のようなものです:
String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
ありがとう!