n個のパーティションを持つトピックを提供しました。 SpringのKafkaConsumerリスナーが、同じ/ 1つのパーティションからの複数のメッセージを一度に聞く方法はありますか?単一のSpringのKafkaConsumerリスナーは、同じ/ 1つのパーティションからの複数のメッセージをリッスンできますか?
setBatchListener(true);
を設定してConcurrentKafkaListenerContainerFactory
を試しましたが、コンシューマは1つではなく複数のパーティションから複数のメッセージを消費し始めました。提供
public class BatchReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// maximum records per batch receive
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
// enable batch listeners
// factory.setBatchListener(true);
return factory;
}
@Bean
public BatchReceiver receiver() {
return new BatchReceiver();
}
}
/* Listener */
@KafkaListener(id = "batch-listener", topics = TOPIC_TEST_BATCH)
public void receive(List<String> data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOGGER.info("start of batch receive");
for (int i = 0; i < data.size(); i++) {
LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i),
partitions.get(i) + "-" + offsets.get(i));
// handle message
latch.countDown();
}
LOGGER.info("end of batch receive");
}
質問を言い換えると、「1つのリスナが1つのパーティションからのメッセージのみをどのように消費できますか?」あれは正しいですか? – Arek
それは正しいが、スプリングカフカモジュールの文脈である。 – Rites