カフカのトピックからメッセージのリストを読み込み、RESTサービスに送信するバネカフカバッチリスナーを実装しています。 RESTサービスが停止した場合のオフセット管理について理解したいと思います。バッチのオフセットはコミットしないでください。メッセージは次のポーリングのために処理する必要があります。私は春のカフカのドキュメントを読んだが、リスナーエラーハンドラと現在のコンテナエラーハンドラのシークをバッチで理解するのは混乱している。私はspring-boot-2.0.0.M7バージョンを使用しています。以下は私のコードです。バグkafkaバッチリスナー - バッチで手動でオフセットをコミットする
Listener Config:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency")));
// factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setBatchListener(true);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
env.getProperty("spring.kafka.consumer.enable-auto-commit"));
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
env.getProperty("spring.kafka.consumer.auto-commit-interval"));
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout"));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
return propsMap;
}
Listener Class:
@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> payloadList) throws Exception {
if (payloadList.size() > 0)
//Post to the service
}
Kafka Error Handler:
public class KafkaErrorHandler implements BatchErrorHandler {
private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
LOGGER.info("Exception occured while processing::" + thrownException.getMessage());
}
}
レコードのバッチ処理中に何かが発生した場合、データが失われないように、カフカリスナーを処理する方法。
もSeekToCurrentErrorHandlerハンドルバッチ操作をしていますか?なぜならSeekToCurrentBatchErrorHandlerクラスはorg.springframework.kafka.listenerパッケージで利用できないからです。また、エラーハンドラをseektocurrenterrorhandlerに設定すると、どのようにロギングが発生するのか教えてください。 – user8363477
'SeekToCurrentBatchErrorHandler'が1つありますが、' 2.1'以降は既にhttps://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#_seek_to_current_container_error_handlersです。コードを自分のクラスにアップグレードまたはコピー/貼り付けできます –