2017-07-17 16 views
0

私はカフカに来るすべてのメッセージを消費する以下のプログラムを持っています。すべてのメッセージが消えたら、kafka消費者を閉じますか?

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092']) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

上記のプログラムを使用すると、私はカフカに来るすべてのメッセージを消費することができます。しかし、一度すべてのメッセージが消費されると、私はカフカの消費者を閉じたいと思います。私は同じことに助けが必要です。

答えて

0

kafkaConsumerオブジェクトにconsumer_timeout_ms引数を指定すると、今度はkafkaの消費者を閉じられます。ミリ秒単位のタイムアウト値を受け付けます。 以下はコードスニペットです。上記のコードで

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092'], 
         consumer_timeout_ms=1000) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

消費者は、1秒間の任意のメッセージが表示されない場合には、セッションを終了します。

0

KafkaConsumer.close()の代わりにconsumer.close()が必要なようです。静的メソッドとしては記述されていません。

+0

私はconsumer.close()も使ってみましたが、forループ自体からclose()を実行していないようです。私は何かが分からないことがあるかもしれません。 – pankmish

+0

あなたのループは、もっと多くのメッセージが無期限に来るのを待っていると思います。あなたは、近くに到達するために、それを終了条件を与える必要があります。 – dawsaw

関連する問題