Javaを使用してkafka
コンシューマを作成しています。私はメッセージのリアルタイムを保持したいので、1000以上のように消費を待っているメッセージが多すぎる場合は、消費されていないメッセージを放棄し、最新のオフセットから消費を開始する必要があります。kafkaトピックの最新のオフセットを取得するにはどうすればよいですか?
この問題では、最後にコミットされたオフセットとトピックの最新のオフセット(1パーティションのみ)を比較しようとします。これらの2つのオフセットの差が一定量より大きい場合は、その冗長なメッセージを放棄できるように、次のオフセットとしてトピックを追加します。
私の問題は、トピックの最新のオフセットを取得する方法ですが、古い消費者を使用できると言う人もいますが、それは複雑すぎます。新しい消費者にはこの機能がありますか?
クライアントの現在のオフセットと最新の既知のカフカトピックオフセットとの差異を計算したい場合は、これは機能しません。 – hiaclibe