私たちのプロジェクトにはscalaとpythonのコードがあり、avroでエンコードされたメッセージをkafkaに送る/消費する必要があります。Avro Kafka scalaとPythonの間の変換の問題
私はavroのコードをpythonとscalaを使ってkafkaに送信しています。私は次のようにTwitterの全単射ライブラリを使用してアブロエンコードされたメッセージを送信Scalaのコードでプロデューサーを持っている:
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("url_sha256", row._1)
avroRecord.put("url", row._2._1)
avroRecord.put("timestamp", row._2._2)
val recordBytes = recordInjection.apply(avroRecord)
kafkaProducer.value.send("topic", recordBytes)
アブロスキーマは以下のようになります私はKafkaConsumerに成功し、それを復号化することができる午前
{
"namespace": "com.rm.avro",
"type": "record",
"name": "url_info",
"fields":[
{
"name": "url_sha256", "type": "string"
},
{
"name": "url", "type": "string"
},
{
"name": "timestamp", "type": ["long"]
}
]
}
スカラーで
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
kafkaInputStream.foreachRDD(kafkaRDD => {
kafkaRDD.foreach(
avroRecord => {
val parser = new Schema.Parser()
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val record = recordInjection.invert(avroRecord.value()).get
println(record)
}
)
}
ただし、私はメッセージをデコードできません私は例外に
'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte
Pythonのコードを、次の取得のpythonでのESは以下のようになります。 schema_path = "アブロ/ url_info_schema.avsc" スキーマ= avro.schema.parse(オープン(schema_path).read())
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_msg = reader.read(decoder)
print(decoded_msg)
また、python avroプロデューサメッセージはscala avroコンシューマによって認識されません。私は例外がそこにある。 Python Avroプロデューサは次のようになります:
datum_writer = DatumWriter(schema)
bytes_writer = io.BytesIO()
datum_writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
datum_writer.write(data, encoder)
raw_bytes = bytes_writer.getvalue()
producer.send(topic, raw_bytes)
どうすればPythonとscalaの間で一貫性を保つことができますか?すべてのポインタは素晴らしいでしょう
解決策を見つけました。まもなくソリューションを掲載する予定です。それは他人を助けるかもしれない。 – Abhishek