私はKafkaのPythonハイレベルコンシューマを使用しており、トピックの各パーティションの最新のオフセットを知りたいと考えています。しかし、私はそれを働かせることはできません。kafkaトピックのパーティションの最新のオフセットを取得する方法は?
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
しかし、私が手出力は、私がassign
を使用して、代替的なアプローチを持っていますが、結果はそれは私がかもしれないことを文書の一部から思わ同じ
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
ある
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
ですfetch
が発行されていない場合は、この動作が発生します。しかし、私はそれを強制する方法を見つけることができません。私は間違って何をしていますか?
トピックの最新のオフセットを取得する別の方法と簡単な方法がありますか?
ない100%正、私はあなたのコードはカフカ-python'が実際にブローカーに接続された '前HIGHWATERの値を返していると思います。 'KafkaConsumer'は非同期なので、実際には、ハイウォーターの値を入力するメッセージを消費しなければならないと思います。https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –