2016-01-05 9 views
6

私はKafkaとavroにとって全く新しいので、コンフルエントなパッケージを使用しようとしています。私たちはJPAのために使用する既存のPOJOを持っており、各値を手動で汎用レコードに反映させることなくPOJOのインスタンスを作成することができます。私はこれがドキュメントでどのように行われているのか分からないようです。pojosをconfluent.ioの汎用レコードに変換してKafkaProducerを送信する

例は、一般的なレコードを使用し、そのように1ずつ値1を設定します。

String key = "key1"; 
String userSchema = "{\"type\":\"record\"," + 
        "\"name\":\"myrecord\"," + 
        "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; 
Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(userSchema); 
GenericRecord avroRecord = new GenericData.Record(schema); 
avroRecord.put("f1", "value1"); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

ありクラスからスキーマを取得するためのいくつかの例があると私は注釈が必要なように、そのスキーマを変更することが判明しました。今私はPOJOのインスタンスを取ってそのままシリアライザに送り、クラスのスキーマをマッチングして値を汎用レコードにコピーする作業をライブラリに持たせますか?私はこれについてすべて間違っているのですか?私がやりたいことは次のようなものです:

String key = "key1"; 
Schema schema = ReflectData.get().getSchema(myObject.getClass()); 
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

ありがとう!

答えて

1

私は、このインスタンスでは私自身のシリアライザを作成巻き上げる:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer { 
    private final EncoderFactory encoderFactory = EncoderFactory.get(); 

    @Override 
    protected byte[] serializeImpl(String subject, Object object) throws SerializationException { 
     //TODO: consider caching schemas 
     Schema schema = null; 

     if(object == null) { 
     return null; 
     } else { 
     try { 
      schema = ReflectData.get().getSchema(object.getClass()); 
      int e = this.schemaRegistry.register(subject, schema); 
      ByteArrayOutputStream out = new ByteArrayOutputStream(); 
      out.write(0); 
      out.write(ByteBuffer.allocate(4).putInt(e).array()); 

      BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); 
      DatumWriter<Object> writer = new ReflectDatumWriter<>(schema); 
      writer.write(object, encoder); 
      encoder.flush(); 
      out.close(); 

      byte[] bytes = out.toByteArray(); 
      return bytes; 
     } catch (IOException ioe) { 
      throw new SerializationException("Error serializing Avro message", ioe); 
     } catch (RestClientException rce) { 
      throw new SerializationException("Error registering Avro schema: " + schema, rce); 
     } catch (RuntimeException re) { 
      throw new SerializationException("Error serializing Avro message", re); 
     } 
     } 
    } 
} 
関連する問題