2017-10-19 4 views
0

Producer.sendコールバックはメッセージオブジェクトを提供します。 message.offset()は、バグのように見えることが多く、0を返します。confluent-python kafkaプロデューサはコールバックを送りますmessage.offset()は0を返します

コンフルエントカフカのPythonライブラリのバージョン0.11.0 librdkafka:安定した0.11.0(ボトル入り)、HEAD。マックOS自作

次の簡単なテストプログラムを介してインストール:

import confluent_kafka 
import timeit 


def delivery_callback(error, message): 
    print("delivery_callback. error={}. message={}".format(error, message)) 
    print("message.topic={}".format(message.topic())) 
    print("message.timestamp={}".format(message.timestamp())) 
    print("message.key={}".format(message.key())) 
    print("message.value={}".format(message.value())) 
    print("message.partition={}".format(message.partition())) 
    print("message.offset={}".format(message.offset())) 


def produce_string_messages(kafka_producer, topic_name, num_messages): 
    start_time = timeit.default_timer() 

    for i in range(num_messages): 
     kafka_producer.produce(topic_name, value="cf-k test. v{}".format(i), on_delivery=delivery_callback) 

    elapsed = timeit.default_timer() - start_time 
    print("completed producing messages. They are queued for delivery. elapsed={}. elapsed/msg={}".format(elapsed, elapsed/num_messages)) 


if __name__ == "__main__": 
    print("starting") 

    conf = { 
     'bootstrap.servers': "kafka-broker-1:9092" 
    } 

    kafka_producer = confluent_kafka.Producer(conf) 

    print("opened KafkaProducer") 
    produce_string_messages(kafka_producer, "my-string-topic", 3) 

    print("flushing...") 
    kafka_producer.flush() 

    print("exiting") 

は生成します。message.offset()は最初の二つのメッセージと第三のための非ゼロはゼロである

starting 
opened KafkaProducer 
completed producing messages. They are queued for delivery. elapsed=0.000994920730591. elapsed/msg=0.00033164024353 
flushing... 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v0 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v1 
message.partition=0 
message.offset=0 
delivery_callback. error=None. message=<cimpl.Message object at 0x10f986ec0> 
message.topic=my-string-topic 
message.timestamp=(1, 1508451238822L) 
message.key=None 
message.value=cf-k test. v2 
message.partition=0 
message.offset=24 
exiting 

ていることに注意してください。このテストプログラムを再度実行して3つのメッセージを送信すると、3番目のmessage.offsetが3ずつ増加します。これはちょうどmessage.offset()が間違って0を返すバグのようです。

答えて

0

配信レポートでは有効なオフセット生成されたバッチの最後のメッセージです。これはそうのように、trueにproduce.offset.reportトピック・レベルの構成プロパティを設定することで、バッチ内のすべてのメッセージのための適切なオフセットを提供するために変更することができます。

p = confluent_kafka.Producer({'bootstrap.servers': ..., 
           'default.topic.config': { 'produce.offset.report': True } }) 

私たちは、将来のリリースでは真であると、デフォルトを変更しますPythonクライアント。

[1]:バッチ内のメッセージのリニアスキャンを回避しますが、パフォーマンスに与える影響はPythonの土地ではほとんど無関係ですので、心配する必要はありません。

+0

パーフェクト。ありがとう!主な作者として、これに反応することができます:https://stackoverflow.com/questions/44732214/apt-get-install-librdkafka1-fails-on-debian-9-x-due-to-libssl-dependency – clay

関連する問題