2017-02-11 16 views
0

私たちのプロジェクトにはscalaとpythonのコードがあり、avroでエンコードされたメッセージをkafkaに送る/消費する必要があります。Avro Kafka scalaとPythonの間の変換の問題

私はavroのコードをpythonとscalaを使ってkafkaに送信しています。私は次のようにTwitterの全単射ライブラリを使用してアブロエンコードされたメッセージを送信Scalaのコードでプロデューサーを持っている:

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc") 
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString 
val schema = parser.parse(schemaFile) 
val recordInjection = GenericAvroCodecs[GenericRecord](schema) 
val avroRecord = new GenericData.Record(schema) 
avroRecord.put("url_sha256", row._1) 
avroRecord.put("url", row._2._1) 
avroRecord.put("timestamp", row._2._2) 
val recordBytes = recordInjection.apply(avroRecord) 
kafkaProducer.value.send("topic", recordBytes) 

アブロスキーマは以下のようになります私はKafkaConsumerに成功し、それを復号化することができる午前

{ 
    "namespace": "com.rm.avro", 
    "type": "record", 
    "name": "url_info", 
    "fields":[ 
    { 
     "name": "url_sha256", "type": "string" 
    }, 
    { 
     "name": "url", "type": "string" 
    }, 
    { 
     "name": "timestamp", "type": ["long"] 
    } 
] 

}

スカラーで

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc") 
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString 


kafkaInputStream.foreachRDD(kafkaRDD => { 
    kafkaRDD.foreach(

    avroRecord => { 
     val parser = new Schema.Parser() 
     val schema = parser.parse(schemaFile) 
     val recordInjection = GenericAvroCodecs[GenericRecord](schema) 
     val record = recordInjection.invert(avroRecord.value()).get 
     println(record) 
    } 
) 

} 

ただし、私はメッセージをデコードできません私は例外に

'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte 

Pythonのコードを、次の取得のpythonでのESは以下のようになります。 schema_path = "アブロ/ url_info_schema.avsc" スキーマ= avro.schema.parse(オープン(schema_path).read())

for msg in consumer: 
    bytes_reader = io.BytesIO(msg.value) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schema) 
    decoded_msg = reader.read(decoder) 
    print(decoded_msg) 

また、python avroプロデューサメッセージはscala avroコンシューマによって認識されません。私は例外がそこにある。 Python Avroプロデューサは次のようになります:

datum_writer = DatumWriter(schema) 
bytes_writer = io.BytesIO() 

datum_writer = avro.io.DatumWriter(schema) 
encoder = avro.io.BinaryEncoder(bytes_writer) 
datum_writer.write(data, encoder) 
raw_bytes = bytes_writer.getvalue() 
producer.send(topic, raw_bytes) 

どうすればPythonとscalaの間で一貫性を保つことができますか?すべてのポインタは素晴らしいでしょう

+0

解決策を見つけました。まもなくソリューションを掲載する予定です。それは他人を助けるかもしれない。 – Abhishek

答えて

1

私はバイナリエンコーダをPythonで使用していましたが、Scalaでは何も使用していませんでした。ただ、私は他の人がそれが役に立つことを願っ

val recordInjection = GenericAvroCodecs.toBinary[GenericRecord](schema) 

val recordInjection = GenericAvroCodecs[GenericRecord](schema) 

から1行を変更しなければなりませんでした。 Pythonコードで変更する必要はありません

関連する問題