0
私のアプリケーション内のカフカトピックに複数のリスナーを設定することを検討しています。以下は私のセットアップです。両方のグループで消費されるはずですが、1つのリスナーだけが消費します。私はここで何が欠けていますか?スプリングカフカを使用している複数の消費者
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs());
return consumerFactory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean("notificationFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean("insertContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(100);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory")
public void receiveForInsert(String message) {
locationProcessor.insertLocationData(message);
}
@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory")
public void receiveForNotification(String message) {
locationProcessor.processNotificationMessage(message);
}
編集:以下は、それぞれに異なるgroup.id
が必要
@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener")
public void receiveForInsert(String message) {
locationProcessor.insertLocationData(message);
}
これは完全に機能しました。ありがとう。私は1.3リリースにアップグレードし、group.idを使用しました。 –