私は、暗号化されたテキストであるKafkaからのメッセージを消費するためにpython confluent-kafka apiをうまく使用できます。私はアブロエンコード(バイナリ)メッセージを消費しようとするとconfluent-kafka python avro messages
、私は上UnicodeDecodeError例外を取得:明らか
msg = kafka_consumer.poll(timeout=2.0)
、アブロエンコードされた値はバイナリではなく、ユニコード。 Java APIを使用すると、KafkaConsumerコンストラクタに対してKafkaAvroDeserializerを指定できます。 Python APIにはこれと並行または類似の設定オプションがないようです。
基本となるlibrdkafkaに渡すことができる設定オプションは、カスタム(デ)シリアライザ、デコーダ、またはavro関連のオプションを指定していないようです。
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
は、どのように私はPythonの合流・カフカAPIとアブロエンコードされたメッセージを消費していますか?