2017-06-07 19 views
0

ユーザエージェント、場所などのフィールドを持つjson辞書付きのクリックストリームデータを表すPythonのKafka Avroメッセージをリモートサーバから受信しています(Confluent Kafka Pythonライブラリのコンシューマを使用しています) 、URLなどです。メッセージは次のようになります。PythonでKafka Avro文字列をデコード/逆シリアル化する方法

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\[email protected]\x02\xec\xc09#J\[email protected]\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.' 

デコード方法は?私はbsonのデコードを試みましたが、文字列はUTF-8として認識されませんでした。私はhttps://github.com/verisign/python-confluent-schemaregistryを見つけましたが、Python 2.7しかサポートしていません。理想的には、Python 3.5以降とMongoDBを使ってデータを処理し、現在のインフラストラクチャとして保存したいと思っています。

答えて

0

AvroライブラリはAvroファイルを読み込むだけだと思っていましたが、実際には、以下のようにKafkaメッセージの解読の問題を解決しました。まずライブラリをインポートし、スキーマファイルをパラメータとして与え、私はコンシューマ・ループで使用できるディクショナリにメッセージを送信します。

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False 
関連する問題