2017-11-13 7 views
0

Kafka Avroシリアライザとデシリアライザは動作していません。私はkafkaコンソールのコンシューマーを使ってメッセージを消費しようとしました。メッセージが公開されているのがわかりました。Avroシリアライザとデシリアライザkafka java api

public class AvroProducer<T> { 

    private static Properties props; 
    static { 
     props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); 
     props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer"); 
    } 

    private static KafkaProducer producer = new KafkaProducer<>(props); 

    public byte[] createRecords(T pojo) throws IOException{ 
     Schema.Parser parser = new Schema.Parser(); 
     Schema schema = null; 
     try { 
      schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc")); 
     } catch (IOException e) { 
      System.out.println(e.getLocalizedMessage()); 
     } 

     final GenericData.Record record = new GenericData.Record(schema); 
     schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(pojo).getPropertyValue(r.name()))); 
     SpecificDatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema); 
     try(ByteArrayOutputStream os = new ByteArrayOutputStream()){ 
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 

      writer.write(record, encoder); 
      encoder.flush(); 
      byte[] bytemessage = os.toByteArray(); 
      return bytemessage; 
     } 

    } 

    public static void sendMessage(byte[] bytemessage){ 
     ProducerRecord precord = new ProducerRecord<StringSerializer, byte[]>("jason", bytemessage); 
     producer.send(precord); 
    } 
} 



    public class AvroConsumer { 

    private static Properties kafkaProps; 

    static { 
     kafkaProps = new Properties(); 
     kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne"); 
    } 

    @SuppressWarnings("unchecked") 
    public static void recieveRecord() throws IOException{ 
     try (KafkaConsumer<String,byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) { 
      kafkaConsumer.subscribe(Arrays.asList("jason")); 
      while (true) { 
       ConsumerRecords<String,byte[]> records = kafkaConsumer.poll(100); 
       Schema.Parser parser = new Schema.Parser(); 
       final Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc")); 
       records.forEach(record -> { 
        SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema); 
        ByteArrayInputStream is = new ByteArrayInputStream(record.value()); 
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null); 
        try { 
         Syslog log = (Syslog) datumReader.read(null, binaryDecoder); 

         System.out.println("Value: " + log); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       }); 
      } 
     } 
    } 

} 

スタックトレースは以下のとおりです。詳細はこちらをご覧ください。誰かが正しい実装で私を導くことができますか?問題は、値にアクセスするためのキャストrecord.howにあるようです。特定のデータムリーダーを使用してデータを読み取る方法。

Exception in thread "Thread-1" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.cisco.daas.kafka.Syslog 
at com.cisco.daas.kafka.AvroConsumer.lambda$0(AvroConsumer.java:46) 
at java.lang.Iterable.forEach(Unknown Source) 
at com.cisco.daas.kafka.AvroConsumer.recieveRecord(AvroConsumer.java:41) 
at com.cisco.daas.kafka.MainApp$1.run(MainApp.java:32) 
at java.lang.Thread.run(Unknown Source) 

これは私がこの問題は、キーを使用することによって解決される

{ 
    "namespace": "com.example.syslogmessage", 
    "type": "record", 
    "name": "SysLogMessage", 
    "fields": [{ 
      "name": "partyID", 
      "type": "long" 
     }, 
     { 
      "name": "applianceID", 
      "type": "string" 
     }, 
     { 
      "name": "message", 
      "type": "string" 
     }, 
     { 
      "name": "severity", 
      "type": "long" 
     }, 
     { 
      "name": "messageType", 
      "type": "string" 
     }, 
     { 
      "name": "eventtime", 
      "type": "long", 
      "logicalType": "timestamp-millis" 
     }, 
     { 
      "name": "ipaddress", 
      "type": "string" 
     }, 
     { 
      "name": "hostname", 
      "type": "string" 
     } 
    ] 
} 

答えて

0

を解析しようとしていますスキーマ、GenericRecord

public class AvroConsumer<T> { 

    private static Properties kafkaProps; 

    static { 
     kafkaProps = new Properties(); 
     kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne"); 
    } 

    public void recieveRecord() throws IOException { 
     try (KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) { 
      kafkaConsumer.subscribe(Arrays.asList("jason")); 
      while (true) { 
       ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100); 
       Schema.Parser parser = new Schema.Parser(); 
       final Schema schema = parser 
         .parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc")); 
       records.forEach(record -> { 
        SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(schema); 
        ByteArrayInputStream is = new ByteArrayInputStream(record.value()); 
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null); 
        try { 
         T log = datumReader.read(null, binaryDecoder); 
         System.out.println("Value: " + log); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       }); 
      } 
     } 
    } 

} 
の値であり、
関連する問題