2017-08-19 14 views
1

私は消費者に常に最新のオフセットから開始させたいユースケースがあります。このコンシューマのオフセットをコミットする必要はありません。これは、新しいコンシューマ・グループが常に新しく割り当てられたパーティションをコミットするため、spring-kafkaでは実現できません。次に、プログラムのその後の開始時に、消費者は最新のものからではなく、この記憶されたオフセットから読み出す。言い換えれば、新しい消費者グループの最初の最初のスタートが正しく動作する、すなわち最新のものから消費する。問題は、参考のためKafkaMessageListenerContainer$ListenerConsumer.onPartitionsAssigned()auto-offset-reset = latestはspring-kafkaで動作しません

であり、私は、コードが新しいコンシューマ・グループが消費し始めながら再分割が発生したときにいくつかの厄介な競合状態を解決するために追加されたことを春ブーツ

spring.kafka.listener.ack-mode=manual 
spring.kafka.consumer.auto-offset-reset=latest 
spring.kafka.consumer.enable-auto-commit=false 

答えて

1

で次のように設定します。構成によっては、レコードが失われたり重複したりする可能性があります。

これらの条件を回避するために、初期オフセットをコミットするのが最も良いと感じました。

私は、ユーザーがオフセット(MANUAL ackmodeを使用して)について完全な責任を負う場合、おそらくコミットを行わないことに同意します。レースに対処するのはユーザーコードに依存します(あなたの場合は、失われたレコードは気にしません)。

気軽にGitHubの問題を開くことができます(寄付を歓迎します)。

その間に、リスナーにConsumerSeekAwareを実装させ、割り当て中にトピック/パーティションの終了を求めるようにすることで回避できます。

別の方法として、毎回group.idにUUIDを使用する方法があります。あなたは常にトピックの終わりから始めるでしょう。

関連する問題