2017-02-11 22 views
1

メッセージのシリアライズ/デシリアライズにAvro Serialize with Apache kafkaを使用しようとしています。私は特定のタイプのメッセージをシリアライズしてキューに送信するために使用される1つのプロデューサを作成しています。メッセージがキューに正常に送信されると、コンシューマーはメッセージを選択して処理しようとしますが、試行中に例外が発生している間に、特定のオブジェクトへのケースバイトが発生します。例外は以下の通りである:Apache Kafka Avro Deserialization:特定のタイプのメッセージを逆シリアル化またはデコードできません。

カフカプロデューサーコード:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
     kafkaProducer = new KafkaProducer<>(kafkaProps); 
    } 


public static void main(String[] args) throws InterruptedException, IOException { 
     Customer customer1 = new Customer(1002, "Jimmy"); 

     Parser parser = new Parser(); 
     Schema schema = parser.parse(AvroSpecificProducer.class 
       .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

     SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
     try(ByteArrayOutputStream os = new ByteArrayOutputStream()) { 
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
      writer.write(customer1, encoder); 
      encoder.flush(); 

      byte[] avroBytes = os.toByteArray(); 

      ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry", 
        "Customer One 11 ", avroBytes 
      ); 

      asyncSend(record1); 
     } 

     Thread.sleep(10000); 
    } 

カフカの消費者

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51) 
    at java.lang.Iterable.forEach(Iterable.java:75) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

、我々はデータを読み取るためのいくつかのinconenientの方法を使用している例外によると、下記の当社のコードですコード:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    } 

    public static void infiniteConsumer() throws IOException { 
     try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) { 
      kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry")); 

      while(true) { 
       ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100); 
       System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count()); 

       Schema.Parser parser = new Schema.Parser(); 
       Schema schema = parser.parse(AvroSpecificDeserializer.class 
         .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

       records.forEach(record -> { 
        DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null); 
        try { 
         System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); 
         Customer customer = customerDatumReader.read(null, binaryDecoder); 
         System.out.println(customer); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       }); 
      } 

     } 
    } 

コンソールでコンシューマを使用すると、正常にメッセージを受信できますe。だから私たちのpojoファイルにメッセージをデコードする方法は何ですか?

答えて

0

、このため

DatumReader<GenericRecord> customerDatumReader = new SpecificDatumReader<>(schema); 

代わりの

`DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 

正確な理由を使用しているこの問題の解決策は、まだ見つかっていません。これは、Kafkaがメッセージの構造を知らないため、メッセージのスキーマを明示的に定義し、GenericRecordはスキーマに従ってメッセージを読みやすいJSON形式に変換するのに便利です。 JSONを作成したら、POJOクラスに簡単に変換できます。

しかし、変換するソリューションをPOJOクラスに直接見つける必要があります。

0

値をProduceRecordに渡す前にAvroシリアル化を明示的に行う必要はありません。シリアライザはあなたのためにそれを行います。あなたのコードは次のようになります。

Customer customer1 = new Customer(1002, "Jimmy"); 
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", customer1); 
    asyncSend(record1); 
} 

simple producer using avro

+0

ためコンフルエントから例を参照してください@Javierねえ、まず私の質問は、プロデューサーとコンシューマーないに関係していると思います。 Second:例を見てみると、 'JavaSessionize.avro.LogLine'はavroクラスのように見えるので、そのために直列化を処理するかもしれません。第三:私は汎用型変換ではなく特定型変換を使用しています。 Avroは8種類しかサポートしていません。スキーマ変換全体を定義する必要があります。 –

+0

@HarmeetSinghTaara私はプロデューサを調べることを提案しました。なぜなら、あなたが期待するものと異なる何かを連載すれば、消費者は失敗するからです。コンフルエントのkafka-avro-console-consumerを使用してイベントを読み、それが期待どおりに見えるかどうかを確認することをお勧めします。それでも問題が解決しない場合は、問題は消費者にはない。 –

+0

@HarmeetSinghTaara LogLineは[このavroスキーマ](https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/resources)を渡した結果です。 /avro/LogLine.avsc)を開き、自動的にJavaクラスを生成する 'maven-avro-plugin'を実行します。プロジェクトをコンパイルする場合は、自分の「Customer」クラスと外観を比較することができます。私は彼らが 'LogLine'の中で何らかのシリアライゼーションをしているとは思わないが、私はそれがちょうどPOJOだと確信している。 –

関連する問題