DBに処理された後、各レコードを承認するSpring KafkaListenerを使用しています。 DBへの書き込みに問題がある場合、私たちはレコードを承認しないので、オフセットはコンシューマーにコミットされません。これはうまく動作します。次に、次のポーリングで失敗したメッセージを取得して再試行します。 errorhandlerをリスナーに追加し、ConsumerAwareListenerErrorHandlerを呼び出し、失敗したメッセージオフセットに対してconsumer.seek()を実行しようとしました。期待は次の世論調査の間です、我々は失敗したメッセージを受け取るべきです。これは起こっていない。次のポーリングでは、新しいメッセージのみがフェッチされ、失敗したメッセージはフェッチされません。スプリングカフカ:カフカリスナー - 消費者.seek号
@Service
public class KafkaConsumer {
@KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler")
public void onMessage(ConsumerRecord<Integer, String> record,
Acknowledgment acknowledgment) throws Exception {
try {
msg = JaxbUtil.convertJsonStringToMsg(record.value());
onHandList = DCMUtil.convertMsgToOnHandDTO(msg);
TeradataDAO.updateData(onHandList);
acknowledgment.acknowledge();
recordSuccess = true;
LOGGER.info("Message Saved in Teradata DB");
} catch (Exception e) {
LOGGER.error("Error Processing On Hand Data ", e);
recordSuccess = false;
}
}
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException {
return (message, exception, consumer) -> {
this.listen3Exception = exception;
MessageHeaders headers = message.getHeaders();
consumer.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
}
Container Class
@Bean
public Map<Object,Object> consumerConfigs() {
Map<Object,Object> props = new HashMap<Object,Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
localhost:9092);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@SuppressWarnings("unchecked")
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
私は私のリスナーへの追加の消費者を試してみましたが、例外@KafkaListener(トピック=( "$ {kafka.input.stream.topic}")、containerFactory = "kafkaManualAckListenerContainerFactory") \t \tます。public voidのonMessage(ConsumerRecordに求めました<整数、文字列>レコード、 \t \t \t \t肯定応答確認、消費者の消費者)は例外{ }キャッチ(例外e){ \t \t \t consumer.seek(新しいorg.apache.kafka.common.TopicPartition(レコードをスロー.topic()、 record.partition())、record.offset()); \t \t \t throw e; nullの代わりにLoggingErrorHandler()というエラーハンドラが表示される – SKumar
コメントにコードを入れないでください。読むのは難しいです。代わりに質問を編集してください。うん。私はhttps://github.com/spring-projects/spring-kafka/issues/470で作業を始めました。次の日に修正される予定です。 –
私は今のところ、ConsumerAwareErrorHandler(コンテナ内で、リスナーエラーハンドラではない)を使用し、 'Error'をスローするしかないと思います。 –