トピックが40個あります。設定は次のとおりです。コンフルエントKafka:コンシューマは、トピックのすべてのパーティションについて最初から読み込みません。
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
トピックtopic.source
のすべてのパーティションについて、オフセット0から読みたいと思います。しかし、私はそれがすべてのパーティションで起こっているとは思わない。いくつかのパーティションでは、コミットされたオフセットであると仮定している特定のオフセットから読み込み、毎回group.id
を変更するとどちらも役に立ちません。コミットされたオフセットに関係なく、このトピックのすべてのパーティションについて、最初からどのように読み込むことができますか?
私はon_assign()
でps
を印刷し、それはすべての40個のパーティションのために、このような何かを印刷:
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
こんにちはミカエル、あなたの答えに感謝します。トピックのパーティションの開始オフセットを知るためのコマンド/ツールはありますか?(以前のメッセージが保存ポリシーのために削除された場合) – NoName
ブローカをチェックしたい場合は、kafkaの 'log.dirs'に入り、パーティションのディレクトリを見つけることができます。内部に '* .log'ファイルがあります。ファイルの名前は、最初のオフセットを示す必要があります。たとえば、 '00000000000000000216.log'と表示された場合、216は最初のオフセットです。設定に応じて、いくつかのログファイルが存在する可能性があります。最小の名前をとります。 –
こんにちはミケール!ありがとう、私はlog.dirsを見つけた、私は* .logファイルを見ることができる – NoName