0
@KafkaListenerアノテーションに基づいてカスタムメタアノテーションを作成しようとしていますが、spring-kafkaがエラーをスローします。ここで@ KafkaListenerのメタ注釈を作成する
は私の注釈です:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MessageListener {
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] value() default {};
}
マイコンシューマーサービス:
@EnableKafka
public class CarSubscriber extends BaseSubscriber<String, Car> {
private static final Logger LOGGER =
LoggerFactory.getLogger(CarSubscriber.class);
@MyMessageListener("${kafka.topic.car}")
public void receiveMessage(Car car) {
LOGGER.info("received car=" + car.toString());
getLatch().countDown();
}
}
エラースタックトレース:
java.lang.IllegalArgumentException: An array of topicPartitions must be provided
at org.springframework.util.Assert.notEmpty(Assert.java:228) ~[spring-core-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.kafka.listener.config.ContainerProperties.<init>(ContainerProperties.java:175) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.<init>(AbstractMessageListenerContainer.java:130) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.<init>(ConcurrentMessageListenerContainer.java:69) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:70) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:40) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:177) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:46) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:230) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
実は、私は私のメタ注釈内のトピック名を指定したい場合は、言い換えれば、上記の@MyMessageListenerの内部にあります。しかし、それは動作しません。
私は何か間違っているか、@ KafkaListenerの@AliasForアノテーションをサポートしていないSpring-kafkaですか?助けてください。