2016-04-18 11 views
0

Avroスキーマに基づいて、スキーマに適したクラスで動作するクラス(データ)を生成しました データをエンコードしてから、他のアプリケーション "A"にkafkaを使用して送信しますAvro with Kafka - スキーマの変更による逆シリアル化

(アプリケーション「A」に)私はデシリアライザ

class DataDeserializer implements Deserializer<Data> { 
    private String encoding = "UTF8"; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) { 
     // nothing to do 
    } 

    @Override 
    public Tloog deserialize(String topic, byte[] data) { 
     try { 
      if (data == null) 
      { 
       return null; 
      } 
      else 
      { 
         DatumReader<Tloog> reader = new SpecificDatumReader<Data>(Data.class); 
         DecoderFactory decoderFactory = DecoderFactory.get(); 
         BinaryDecoder decoder = decoderFactory.binaryDecoder(data, null); 
         Data decoded = reader.read(null, decoder); 
         return decoded; 
      } 
     } catch (Exception e) { 
      throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); 
     } 
    } 

を実装することにより、データをdeserilize反対側に

Data data; // <- The object was initialized before . Here it is only the declaration "for example" 
EncoderFactory encoderFactory = EncoderFactory.get(); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);      
     DatumWriter<Tloog> writer;     
     writer = new SpecificDatumWriter<Data>(Data.class); 
     writer.write(data, encoder); 
     byte[] avroByteMessage = out.toByteArray(); 

問題は、このアプローチがIetheデータクラスははずSpecificDatumReaderの使用を必要とすることですと一体化するアプリケーションコード...これは問題になる可能性 - スキーマが変更される可能性があり、したがって、データのクラスを再生成し、もう一度 2の質問に統合する必要があります。

  1. は、私がアプリケーションにGenericDatumReaderを使用すべきか?どのようにそれを行う 正しく。 (私はスキーマをアプリケーションに単純に保存できます)
  2. Dataが変更された場合、SpecificDatumReaderを使用する簡単な方法はありますか?どのようにそれは多くのトラブルで統合することができますか?

おかげ

答えて

1

私はGenericDatumReaderを使用 - だけでなく、実際に私はそれから私のリーダークラスを派生していますが、ポイントを得ます。それを使うために、私はスキーを特別なカフカの話題にしています - Schema驚くほど十分です。消費者とプロデューサーの両方は、起動時にこのトピックを読み、それぞれのパーサーを設定します。

このようにすると、コンシューマとプロデューサを再起動することなく、即座にスキーマを更新できるようになります。これは私の設計目標でした。スキーマを追加または変更するためにアプリケーションを再起動する必要はありませんでした。だからSpecificDatumReaderが私のためにはうまくいかず、正直なところでThriftのようなものの代わりにAvroを最初に使用するのはなぜですか。

更新

アブロを行うには、通常の方法では、レコードのファイルにスキーマを格納することです。私はそんなことはしません。主に私ができないからです。私はKafkaを使用しているので、データを直接スキーマに格納することはできません。別のトピックにスキーマを格納する必要があります。

私のやり方は、まずすべてのスキーマを読み込みます。あなたはテキストファイルからそれらを読むことができます。私が言ったように、私はKafkaトピックからそれらを読んだ。

val schemaArray: Array[String] = Array(
    """{"name":"MyObj","type":"record","fields":[...]}""", 
    """{"name":"MyOtherObj","type":"record","fields":[...]}""" 
) 

ところでScalaをお詫び申し上げますが、それは私が得たものです:私はカフカからそれらを読んだ後、私はこのような配列があります。いずれにしても

、その後、あなたはパーサを作成する必要がある、とforeachのスキーマ、それを解析し、読者や作家を作成し、マップにそれらをオフに保存します。

val parser = new Schema.Parser() 
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*) 
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2))) 
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2))) 
var decoder: BinaryDecoder = null 

私は解析の前に、私はそれのすべてを行います実際のレコードです。パーサを設定するだけです。次に、個々のレコードをデコードするには、私は次のようにします:

+0

[Confluent Schema Registry](http://docs.confluent.io/1.0。1/schema-registry/docs/index.html)。 – vlahmot

+0

私はSchema Registryを見ました.CafkaバックエンドアーキテクチャにRESTfulインターフェイスを壊すのは非常に奇妙です。なぜあなたのクライアントはあなたのスキーマストリームと直接対話できないのでしょうか?それは、馬のチームと一緒に車のシャーシを引っ張るようなものです。確かに、カフカストリームをすでに消費しているこのようなユースケースの場合、スキーマを取得するためにRESTful呼び出しを行うことは望ましくありません。 –

+0

私は自動スキーマの進化とデータ破損からの保護のためにそれが好きでした。スキーマへの参照のみが格納され、各データポイントでの完全なスキーマは格納されないという事実もまた素晴らしいです。スキーマをフェッチするためのWeb呼び出しを追加することは、私たちにとって問題ではありませんでした。 – vlahmot

関連する問題