Debeziumコネクタのカフカ接続イベントはAvroエンコードされています。カフカのトピックavroメッセージを読み取ることができません
カフカ接続スタンドアロンサービスに渡されるconnect-standalone.propertiesで次のように言及されています。これらの特性を有するカフカコンシューマコードを設定
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
:消費者の実装で
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
、次のキーと値の成分を読み取るためのコードです。 RESTを使用してSchema Registryからキーと値のスキーマを取得しています。
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
キーの解析が正常に機能しました。メッセージの値の部分を解析する際に、ArrayIndexOutOfBoundsExceptionが発生しています。
Avroのソースコードをダウンロードしてデバッグしました。 GenericDatumReader.readIntメソッドが負の値を返すことが判明しました。この値は配列(シンボル)のインデックスであると予想され、したがって、正であったはずです。
kafka-avro-standalone-consumerを使用してイベントを消費しようとしましたが、ArrayIndexOutOfBoundsExceptionがスローされました。だから、私の推測では、メッセージがKafka connect(プロデューサー)&で不適切にエンコードされているというのは、設定に問題があるということです。続き
が質問です:
- 生産者や消費者に渡された設定に何か問題はありますか?
- なぜキーの逆シリアル化は機能しますが、それは価値がありませんか?
- 作業に必要なことはありますか? (どこかで文字エンコーディングを指定するなど)。
- AvbeでDebeziumを生産に使用できますか、それとも現在は実験的な機能ですか? Debezium Avroの記事では、Avroを含む例が将来含まれると述べています。
Avroの逆シリアル化がArrayIndexOutOfBoundsExceptionをスローしたが、私が直面している問題に関連付けることができなかった投稿が多数ありました。