2017-06-06 12 views
0

私は自分のラップトップでローカルで作業をしていて、リモートサーバー 'xxxxx'からトピック 'テスト'を読み込もうとしています。コンソールを使用している場合 、私は飼育係、カフカ、その後、消費者の起動:コンソールでKafkaメッセージを消費できますが、Pythonライブラリでは使用できません。

bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties 
bin/kafka-console-consumer.sh --bootstrap-server xxxxx:9092 --topic test --from-beginning 

をし、メッセージがコンソールに表示されます。続くようPythonライブラリを使用した場合 しかし、私は何も見なかっ:リモートメッセージを消費しようとしたとき、私はまた、成功したPythonのカフカの消費者にコンソールにカフカからローカルにメッセージを送ることができます

from kafka import KafkaConsumer 

server = {'server': 'xxxxx:9092', 'topic': 'test'} 

# To consume latest messages and auto-commit offsets 
consumer = KafkaConsumer(server['topic'], 
         group_id='my-group', 
         bootstrap_servers=server['server']) 

for message in consumer: 
    # message value and key are raw bytes -- decode if necessary! 
    # e.g., for unicode: `message.value.decode('utf-8')` 
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
             message.offset, message.key, 
             message.value)) 

を、問題が発生します。 また、リモートサーバーとの接続が確立されているようですが、残念ながら何も届いていません。私が見つけた

答えて

1

ソリューションは、別のライブラリ、コンフルエントカフカのPythonを使用しており、このライブラリはちょうど

編集を聞くために、サーバのIPおよびトピックの名前を設定することで、箱から出して働いていた:ここソリューションです私は実装しました:

AvroライブラリはAvroファイルを読み込むだけで、実際には以下のようにKafkaメッセージの解読の問題を解決したと思います:まずライブラリをインポートし、スキーマファイルをパラメータとして与えてから関数を作成しますメッセージを辞書にデコードして、コンシューマー・ループで使用できるようにします。

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False 
関連する問題