コンフルエントなAvroスキーマレジストリ形式をサポートするバイナリエンコードAvroデータを吐き出すレガシーC++ベースのシステムがあります。私のJavaアプリケーションでは、KafkaAvroDeserializerクラスを使用してメッセージを正常に非直列化しましたが、メッセージを印刷できませんでした。Kafka Avroデコードメッセージを印刷できません
private void consumeAvroData(){
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "http://1.2.3.4:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", LongDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
props.put("schema.registry.url","http://1.2.3.4:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records)
{
System.out.printf("value = %s\n",record.value());
}
}
}
私はデシリアライズされたデータを印刷することができないということであるのはなぜ出力が
{"value":"�"}
のですか?どんな助けにも感謝!
集密アブロシリアライザのためのワイヤフォーマットは、4バイトのスキーマIDとして続く「ワイヤフォーマット」http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
それは、単一の魔法のバイトだと題する節(現在は常に0)で、ここで説明されて
ありがとうございます!私は手動で(コンフルエントデシリアライザを使用せずに)バイト配列を解析しようとしましたが、私は魔法のバイト、スキーマIDを印刷することができますが、何らかの理由でデータを印刷できません。 – KarthikJ
そのデータはバイナリ形式です。あなたはそれを印刷することはできません。このトピックのスキーマレジストリに設定されているものとスキーマIDが一致していますか? –
はい。代わりの方法では、avscファイルを使用して解析されたデータをデコードし、GenericRecordを出力しようとしています。私は最初のバイトをスキーマIDとして2,3,4,5バイト、残りを(配列1の長さまで6)見ることができました。データとして扱い、Avro bytearrayデシリアライザを使ってデータ – KarthikJ