2016-04-26 4 views
2

コンフルエントなプラットフォーム0.9.0.1とkafka-avro-serializer 2.0.1を使用しています。カフカにイベントを送ろうとして、それらを読んでみると、イベントをJavaオブジェクトに戻す方法はわかりません。私はavroとコンフルエントなドキュメントを読んできましたが、これが実行可能であるというヒントがありますが、良い例を見つけることはできません。私のコードは、私がKafkaConsumerで読むときにGenericData $ Recordを返します。私の質問は、それをJava pojoに戻す方法です。私はこのオブジェクトをシリアル化するために使用したコードのbitを見つけました。ここで カフカを使用するときにJavaをAvroに変換して戻す

は私のコードです:あなたは、単純な手動POJO取り込むことができない理由

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericDatumReader; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.io.EncoderFactory; 
import org.apache.avro.reflect.ReflectData; 
import org.apache.avro.reflect.ReflectDatumWriter; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
import org.joda.time.DateTime; 
import org.junit.Test; 

import java.io.ByteArrayOutputStream; 
import java.util.Collections; 
import java.util.Properties; 

/** 
* This is a test... 
*/ 
public class KafkaAvroProducerTest { 
    private static final Logger log = LogManager.getLogger(KafkaAvroProducerTest.class); 

    @Test 
    public void produceAndSendAndEvent() throws Exception { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       org.apache.kafka.common.serialization.StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put("schema.registry.url", "http://localhost:8081"); 
     KafkaProducer producer = new KafkaProducer(props); 

     log.debug("starting producer"); 
     String topic = "topic11"; 
     Schema schema = ReflectData.get().getSchema(Purchase.class); 
     Purchase purchase = new Purchase("appStore", 9.99d, DateTime.now().getMillis(), "BRXh2lf9wm"); 

     ReflectDatumWriter<Purchase> reflectDatumWriter = new ReflectDatumWriter<>(schema); 
     GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema); 
     ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 
     reflectDatumWriter.write(purchase, EncoderFactory.get().directBinaryEncoder(bytes, null)); 
     GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null)); 
     ProducerRecord record = new ProducerRecord<Object, Object>(topic, avroRecord); 

     Thread producerThread = new Thread(() -> { 
      try { 
       while(true) { 
        log.debug("send a message {}", record); 
        producer.send(record); 
        Thread.sleep(2000); 
       } 
      }catch(Exception ex) { 
       log.error("error", ex); 
      } 
     }); 
     producerThread.start(); 

     props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("group.id", "testGroup"); 
     props.put("auto.commit.enable", "false"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); 
     props.put("schema.registry.url", "http://localhost:8081"); 
     org.apache.kafka.clients.consumer.KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer(props); 
     kafkaConsumer.subscribe(Collections.singletonList(topic)); 

     Thread consumerThread = new Thread(() -> { 
      try { 
       while(true) { 
        try { 
         ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000); 
         for (ConsumerRecord<String, GenericRecord> record1 : records) {// 
          log.debug("read - {}", record1.value().getClass()); 
         } 
        }catch(Exception ex) { 
         log.error("error", ex); 
        } 
       } 
      }catch(Exception ex) { 
       log.error("error", ex); 
      } 
     }); 
     consumerThread.start(); 
     System.in.read(); 
    } 
} 

答えて

2

私はアブロを使用することはありませんが、https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.htmlを見て...

class MyPojo { 
    public int v1; 
    public String v2; 
} 

// copied from your example code 
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000); 
for (ConsumerRecord<String, GenericRecord> record1 : records) { 
    GenericRecord avroRecord = record1.value(); 
    MyPojo pojo = new MyPojo(); 
    pojo.v1 = (Integer)avroRecord.get("<fieldname1>"); 
    pojo.v2 = (String)avroRecord.get("<fieldname2>"); 

    // process current pojo 
} 

、これは理にかなっているかどうかわかりません。これが動作すれば、コンストラクタMyPojo(GenericRecord)に移動します。

+1

ありがとうございます!おそらくうまくいくでしょう。私は、avroがjsonのようなものだと誤って思っていたと思います。あなたは単にpojoをシリアライズしてデシリアライズすることができます。しかし、私が見るすべてのavroの例は、(特定のSpecificRecordBaseのサブクラスである)クラスを生成するためにmavenプラグインと共にスキーマを使用するか、GenericRecordを使用して、表示するようにフィールドを手動で設定します。私はシリアル化するために自分のオブジェクトを使用することを望んでいましたが、不可能であるようです。おそらく、スキーマとコードの生成ルートがあります。互換性を維持するのに役立ちます(クラスの変更に合わせてスキーマを変更できます)。 –

+0

各メッセージを変換する実行オーバーヘッドはどのくらいですか?それは二度起こっていないのですか? – aasthetic

+0

私が従うことができるかどうかわかりません。これは 'GenericRecord'からPOJO型への変換です。あなたは何が二度起こると思いますか? –

関連する問題