私はKafkaを使用してSpring統合フローを実装しようとしています。しかし、私はこのエラーにつきものです。Kafka Spring統合フロー上のNULLポインタ
私がデバッグすると、 "HandlerAdapter handlerMethod"がnullであることがわかりますMessagingMessageListenerAdapter
です。私がここでやっていることを確かめたり、もっと必要な設定をしています。しかし、これは私が文書や他のページから見つけることができるすべてです。すべてのヘルプは
java.lang.NullPointerException: null
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:255) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:923) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:903) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:854) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:729) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:615) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
を高く評価していることはここでは私のpom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cyclone.streams</groupId>
<artifactId>kafka-poc</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>
<dependencies>
<!--Spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>1.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--Spring Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<!--Other-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
</pluginRepositories>
そして
@Configuration
public class KafkaConsumerConfig {
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from(kafkaMessageDrivenChannelAdapter())
.get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public DirectChannel consumingChannel() {
return new DirectChannel();
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
ContainerProperties containerProps = new ContainerProperties("order-graph-timeline-topic");
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer(consumerFactory(), containerProps);
return container;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");//TODO
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
ありがとうございました。バージョンに固執する理由はありません。 POCを行うので、最新のものを選んだだけです。だから私は春のカフカと春の統合カフカを1.3.1に変更しました。使用するのに最適なスプリングブートバージョンは何ですか? –
いいえ、 'spring-integration-kafka'はまだ' 2.3.0.RELEASE'と同じです。彼らは絶対に異なる人工物です。春のブートも同じです - '1.5.8.RELEASE'。私が提案しているのは、あなたのSpring Kafkaを '1.3.1.RELEASE'に変更することだけです。それでおしまい。 –
しかし、スプリングカフカは決してNPEを投げてはいけません。 GitHubの問題を開いてください。 –