Apache Kafka(非コンフルエントkafka)0.10を使用します。 kafkaを使ってAVROスキーマを設定したいと思います。 私は以下のようにavroスキーマを持っています。Avroスキーマを集中化したApache Kafka
public byte[] serializeMessage(EventMessage eventMessage) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<EventMessage> writer = new SpecificDatumWriter<EventMessage>(EventMessage.getClassSchema());
writer.write(eventMessage, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}
、のようなメッセージをシリアル化
{
"namespace": "Rule",
"type": "record",
"name": "RuleMessage",
"fields": [
{
"name": "station",
"type": "string"
},
{
"name": "model",
"type": "string"
}
}
これは期待通りに働いています。
しかし、トピックレベルでAvroスキーマをセットアップしたいので、メッセージがavroスキーマを満たしていない場合はトピックがメッセージを拒否します。
とにかく、私はApache Kafka 0.10でこれを行うことができます。
おかげ