0

トピックが40個あります。設定は次のとおりです。コンフルエントKafka:コンシューマは、トピックのすべてのパーティションについて最初から読み込みません。

def on_assign (c,ps): 
for p in ps: 
    p.offset=0 
print ps 
c.assign(ps) 

conf = {'bootstrap.servers': 'localhost:9092' 
     'enable.auto.commit' : False, 
     'group.id' : 'confluent_consumer', 
     'default.topic.config': {'auto.offset.reset': 'earliest'} 
     } 
consumer = Consumer(**conf) 
consumer.subscribe(['topic.source'], on_assign=on_assign) 

msg = consumer.poll(timeout=100000) 
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key()) 

トピックtopic.sourceのすべてのパーティションについて、オフセット0から読みたいと思います。しかし、私はそれがすべてのパーティションで起こっているとは思わない。いくつかのパーティションでは、コミットされたオフセットであると仮定している特定のオフセットから読み込み、毎回group.idを変更するとどちらも役に立ちません。コミットされたオフセットに関係なく、このトピックのすべてのパーティションについて、最初からどのように読み込むことができますか?

私はon_assign()psを印刷し、それはすべての40個のパーティションのために、このような何かを印刷:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on 

答えて

1

あなたが新しい値にgroup.idを設定するか、いずれかに設定auto.offset.resetオフセットを犯していないグループを使用して使用している場合earliestこの場合、コンシューマはパーティションの先頭から開始します。

あなたのブローカーのログ保存設定によっては、Kafkaがメッセージを削除することがあります。そのため、パーティション内の利用可能なメッセージの1つがオフセットの可能性があります。

+0

こんにちはミカエル、あなたの答えに感謝します。トピックのパーティションの開始オフセットを知るためのコマンド/ツールはありますか?(以前のメッセージが保存ポリシーのために削除された場合) – NoName

+0

ブローカをチェックしたい場合は、kafkaの 'log.dirs'に入り、パーティションのディレクトリを見つけることができます。内部に '* .log'ファイルがあります。ファイルの名前は、最初のオフセットを示す必要があります。たとえば、 '00000000000000000216.log'と表示された場合、216は最初のオフセットです。設定に応じて、いくつかのログファイルが存在する可能性があります。最小の名前をとります。 –

+0

こんにちはミケール!ありがとう、私はlog.dirsを見つけた、私は* .logファイルを見ることができる – NoName

関連する問題