私がenable.auto.commit=false
を持っていて、consumer.commitAsync()
と呼ぶことなくconsumer.poll()
を呼び出すと、なぜconsumer.poll()
は次回の呼び出し時に 新しいレコードを返しますか?Consumer.poll()はオフセットをコミットせずに新しいレコードを返しますか?
私はオフセットをコミットしなかったので、poll()
は同じレコードであるはずの最新のオフセットを返します。
処理中に障害シナリオを処理しようとしているため、私は尋ねています。私はオフセットをコミットせずに、poll()
が再び同じレコードを返すことを望んでいたので、それらの失敗したレコードを再度処理することができます。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
意味があります。しかし、[poll()doc](https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long))には、「最後に消費されたオフセットは手動でseek(TopicPartition、long)で設定するか、購読済みパーティションのリストの最後にコミットされたオフセットとして自動的に設定されます。 最後のコミットされたオフセットに消費されたオフセットを設定する、新しいオフセットをコミットしないと 'poll()'が新しいレコードを返さないようにする必要があります。私の理解は正しいのですか? – Glide
私はドキュメンテーションの部分は消費の出発点/オフセットを指していると思います。だからあなたはどちらかを開始またはコミットされたオフセットを使用してどこからでも起動します。 – ftr
'poll()'が最初に呼び出されたときにのみコミットされたオフセットを使用することを意味しますか? – Glide