このプロジェクトでは、kafkaと統合するためにspring-cloud-stream-binder-kafkaバージョン1.1.2を使用しています。最近、私たちのサービスの1つが起動後にトピックからの古いメッセージ(既に消費されている)を消費した状況がありました。そのトピックには2つのパーティションと2つのコンシューマがコンシューマ・グループにグループ化されています。オフセットが飼い主に適切にコミットされているかどうかはわかりません。起動時に各メッセージに対して以下のエラーメッセージが表示されます。spring-cloud-stream-kafkaアプリケーションの起動後に最新のメッセージを使用します。
[-kafka-listener-2] ERROR o.s.k.listener.LoggingErrorHandler.handle - Error while processing: ConsumerRecord(topic = statemachine_deal_notification, partition = 1, offset = 926, key = null, value = [[email protected])
これが再び起こらないように、私たちはいつもトピックの最新のメッセージのみを読んでみたいです。私はをtrue
に設定し、startOffset
をlatest
に設定することができます。しかし、これらの特性は消費者に影響を与えません。後でこの機能が削除されています。
特定のグループの消費者が最新のメッセージのみを消費するようにする方法はありますか?