2017-07-07 1 views
0

私はトピック1のKafkaに5つのメッセージを生成し、それを正常に消費しました。 6番目のメッセージを送信して消費しようとすると、最新(6番目)のメッセージではなく、6個のメッセージがすべて再取得されています。auto.offset.reset =が最大の場合でも、消費者は毎回トピックからのすべてのメッセージを読むのはなぜですか?

私は、データベースコネクタ(アクセスモジュール)ではなく、コンシューマコマンドラインを実行しています。そして、コネクタ(以下ログからすべての設定プロパティを参照してください)「最大」に設定しauto.offset.reset設定プロパティを持つ

また、以下のOffsetChecker出力を参照してください。

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ 
    --group testjob --zookeeper localhost:2181 --topic topic1 

[2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) 
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0. 

は誰でした問題がどこにあるのか教えてください。ここで

は、設定プロパティを示したログです:

***Global config Properties*** 
*    client.id = rdkafka 
*    message.max.bytes = 1200 
*    receive.message.max.bytes = 100000000 
*    metadata.request.timeout.ms = 60000 
*    topic.metadata.refresh.interval.ms = 600000 
*    topic.metadata.refresh.fast.cnt = 10 
*    topic.metadata.refresh.fast.interval.ms = 250 
*    topic.metadata.refresh.sparse = false 
*    socket.timeout.ms = 60000 
*    socket.send.buffer.bytes = 0 
*    socket.receive.buffer.bytes = 0 
*    socket.keepalive.enable = false 
*    socket.max.fails = 3 
*    broker.address.ttl = 300000 
*    broker.address.family = any 
*    statistics.interval.ms = 0 
*    log_cb = 0x7fecb80c6dd0 
*    log_level = 6 
*    socket_cb = 0x7fecb80cd2f0 
*    open_cb = 0x7fecb80ddd30 
*    opaque = 0x2641280 
*    internal.termination.signal = 0 
*    queued.min.messages = 100000 
*    queued.max.messages.kbytes = 1000000 
*    fetch.wait.max.ms = 100 
*    fetch.message.max.bytes = 1049776 
*    fetch.min.bytes = 1 
*    fetch.error.backoff.ms = 500 
*    group.id = testjob 
*    queue.buffering.max.messages = 100000 
*    queue.buffering.max.ms = 1000 
*    message.send.max.retries = 2 
*    retry.backoff.ms = 100 
*    compression.codec = none 
*    batch.num.messages = 1000 
*    delivery.report.only.error = false 
*    request.required.acks = 1 
*    enforce.isr.cnt = 0 
*    request.timeout.ms = 5000 
*    message.timeout.ms = 300000 
*    produce.offset.report = false 
*    auto.commit.enable = true 
*    auto.commit.interval.ms = 60000 
*    auto.offset.reset = largest <<<<-------- 
*    offset.store.path = . 
*    offset.store.sync.interval.ms = 0 
*    offset.store.method = file 
*    consume.callback.max.messages = 0 
+2

どのように消費者を実行したのに動作します。このプロパティを追加しますか?完全なコマンドラインは、何がうまくいかないのかを診断するのに役立ちます。 – amethystic

+0

あなたの消費者を知ることなく、推測できるのは1つだけです。おそらく、消費者が60歳未満(auto.commit.interval)に達し、正常にシャットダウンする代わりに殺された可能性があります。 zookeeperのノードが欠落していることについて:ZKにオフセットを送信しない「新しいコンシューマ」を実行している可能性があります。または、ZKのルートパスに書き込まないでください(私がお勧めします)。ブローカーの設定(zookeeper.connect)を確認してください。これは 'localhost:2181/kafka'のようになります。この場合、オフセットチェッカーを実行するときにZK接続文字列にパスを追加する必要があります。 – TobiSH

答えて

0

は AUTO_OFFSET_RESET_CONFIGは=「早い」それは

関連する問題