0

私はカフカストリームアプリケーションを持っています。私のアプリケーションがイベントを正常に処理しています。どのようにカフカがコンシューマーオフセットをコミットしましたか?

イベントを再処理/スキップするために必要なオフセットを使用して、カフカコミット済消費者オフセットを変更する方法。私はHow to change start offset for topic?を試しました。しかし、私は 'Node does not exist:'というエラーが出ます。私を助けてください。

答えて

2

あなたが参照している質問/回答は古いカフカバージョンに基づいています。 Kafka 0.9以来、オフセットはZooKeeperにコミットされていませんが、オフセットトピック(トピック名は__consumer_offsets)と呼ばれる特別なカフカトピックに格納されています。

Kafka 1.0以降、コマンドラインツールbin/kafka-consumer-groups.shには、オフセットを設定できる新しい機能があります。元のKIPをチェックしてください:https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

このツールは、Kafka 0.11(さらに古いKafkaバージョン)でも動作します。

代替は、対応するgroup.id持つ単一KafkaConsumerを使用して、独自のツールを、書くことで、あなたは、seek()commit()オフセットのオフセットを変更したいトピック(複数可)に加入。 (このコンシューマの自動コミットを無効にしてください。)

+0

ドキュメントごとにオフセットをリセットしようとしました。しかし、私はオプションの例外を認識できません。私のカフカのバージョンは0.10.2.0です。コマンドは:/home/centos/kafka_2.12-0.10.2.0/bin/kafka-consumer-groups.sh --bootstrap-server 1.1.1.1:9092,1.1.2:9092 - -reset-offsets --group sample-app-0.0.1 --topic abc --to-offset 76756952'例外: 'スレッドの例外" main "joptsimple.UnrecognizedOptionException:reset-offsetsが認識されないオプションです。このオプションは0.10.2.0では使用できません。 –

+0

'kafka_2.12-0.10.2.0/bin/kafka-consumer-groups.s h'を呼び出すので、古いツールを使用しています。 Kafka 1.0をダウンロードし、そこからツールを使用する必要があります。 –