2016-11-11 10 views
2

私はavro形式でKafkaメッセージを消費しようとしていますが、Goのavroからjsonへのメッセージをデコードできません。go中のKafka Avroメッセージを消費する

私はConfluentプラットフォーム(3.0.1)を使用しています。たとえば、私は次のようなavroメッセージを生成します:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' 
{"f1":"message1"} 
{"f1":"message2"} 

私はgo Kafka libary:saramaでメッセージを消費します。プレーンテキストメッセージが正常に動作しています。 Avroメッセージはデコードする必要があります。

しかし、復号後、私は価値のないJSON(両方LIBS)を取得github.com/elodina/go-avro github.com/linkedin/goavro、::私は別のLIBSを見つけ

{"f1":""} 

goavro:

avroSchema := ` 
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]} 
` 
codec, err := goavro.NewCodec(avroSchema) 
if err != nil { 
    log.Fatal(err) 
} 
bb := bytes.NewBuffer(msg.Value) 
decoded, err := codec.Decode(bb) 
log.Println(fmt.Sprintf("%s", decoded)) 

ゴーアブロ:

schema := avro.MustParseSchema(avroSchema) 
reader := avro.NewGenericDatumReader() 
reader.SetSchema(schema) 
decoder := avro.NewBinaryDecoder(msg.Value) 
decodedRecord := avro.NewGenericRecord(schema) 
log.Println(decodedRecord.String()) 

MSG = sarama.ConsumerMessage

答えて

1

ちょうど私がメッセージのバイト配列の最初の5つの要素を削除しなければならなかったこと(バイナリアブロメッセージを比較することで)見つけた - 今ではすべてが動作します:)

message = msg.Value[5:] 

たぶん誰かがなぜ

3

説明することができます最初のバイトはデータ型を識別する魔法です。次の4バイトは、スキーマレジストリを使用する場合にのみ有効なavroスキームIDです。