2016-10-31 10 views
0

Apache Kafka(非コンフルエントkafka)0.10を使用します。 kafkaを使ってAVROスキーマを設定したいと思います。 私は以下のようにavroスキーマを持っています。Avroスキーマを集中化したApache Kafka

public byte[] serializeMessage(EventMessage eventMessage) throws IOException { 

     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     DatumWriter<EventMessage> writer = new SpecificDatumWriter<EventMessage>(EventMessage.getClassSchema()); 
     writer.write(eventMessage, encoder); 
     encoder.flush(); 
     out.close(); 
     return out.toByteArray(); 
    } 

、のようなメッセージをシリアル化

{ 
    "namespace": "Rule", 
    "type": "record", 
    "name": "RuleMessage", 
    "fields": [ 
    { 
     "name": "station", 
     "type": "string" 
    }, 
    { 
     "name": "model", 
     "type": "string" 
    } 
} 

これは期待通りに働いています。

しかし、トピックレベルでAvroスキーマをセットアップしたいので、メッセージがavroスキーマを満たしていない場合はトピックがメッセージを拒否します。

とにかく、私はApache Kafka 0.10でこれを行うことができます。

おかげ

答えて

2

あなたがトピックにスキーマを関連付けるためのApacheカフカの0.10.0で(ライセンスのオープンソースとApache)コンフルエントのスキーマレジストリを使用することができます。 Avroシリアライザ/デシリアライザが付属しており、要求通りの方法でAvroスキーマを自動的に検証します。

「コンフルエントカフカ」のようなものではないことにご注意ください。これは商標違反となりますのでご注意ください。 ConfluentはApache Kafkaを簡単に配布して配布しますが、スキーマレジストリはgithub上にあるので、必要に応じてConfluentパッケージを使用せずに使用できます。

関連する問題