1

私はKafkaを使用してSpring統合フローを実装しようとしています。しかし、私はこのエラーにつきものです。Kafka Spring統合フロー上のNULLポインタ

私がデバッグすると、 "HandlerAdapter handlerMethod"がnullであることがわかりますMessagingMessageListenerAdapterです。私がここでやっていることを確かめたり、もっと必要な設定をしています。しかし、これは私が文書や他のページから見つけることができるすべてです。すべてのヘルプは

java.lang.NullPointerException: null 
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:255) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:923) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:903) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:854) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:729) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:615) [spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144] 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144] 
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144] 

を高く評価していることはここでは私のpom.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>com.cyclone.streams</groupId> 
<artifactId>kafka-poc</artifactId> 
<version>1.0</version> 

<parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-parent</artifactId> 
    <version>1.5.8.RELEASE</version> 
</parent> 

<dependencies> 
    <!--Spring boot--> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-web</artifactId> 
     <version>1.5.8.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-integration</artifactId> 
     <version>1.5.8.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-test</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <!--Spring Kafka--> 
    <dependency> 
     <groupId>org.springframework.kafka</groupId> 
     <artifactId>spring-kafka</artifactId> 
     <version>2.0.1.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-kafka</artifactId> 
     <version>2.3.0.RELEASE</version> 
    </dependency> 
    <!--Other--> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
    </dependency> 
</dependencies> 

<properties> 
    <java.version>1.8</java.version> 
</properties> 


<build> 
    <plugins> 
     <plugin> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-maven-plugin</artifactId> 
     </plugin> 
    </plugins> 
</build> 

<repositories> 
    <repository> 
     <id>spring-releases</id> 
     <url>https://repo.spring.io/libs-release</url> 
    </repository> 
</repositories> 
<pluginRepositories> 
    <pluginRepository> 
     <id>spring-releases</id> 
     <url>https://repo.spring.io/libs-release</url> 
    </pluginRepository> 
</pluginRepositories> 

そして

@Configuration 
public class KafkaConsumerConfig { 

    @Bean 
    public IntegrationFlow flow() { 
     return IntegrationFlows 
       .from(kafkaMessageDrivenChannelAdapter()) 
       .get(); 
    } 

    @Bean 
    public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() { 
     KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = 
       new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer()); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel()); 
     return kafkaMessageDrivenChannelAdapter; 
    } 

    @Bean 
    public DirectChannel consumingChannel() { 
     return new DirectChannel(); 
    } 

    @Bean 
    public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() { 
     ContainerProperties containerProps = new ContainerProperties("order-graph-timeline-topic"); 
     ConcurrentMessageListenerContainer<String, String> container = 
       new ConcurrentMessageListenerContainer(consumerFactory(), containerProps); 
     return container; 
    } 

    @Bean 
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");//TODO 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return props; 
    } 

} 

答えて

1

spring-integration-kafka-2.3.0.RELEASEようなコードであるspring-kafka-2.0.1.RELEASEと互換性がありません。同様に、スプリングブート1.5.8.RELEASEはそのバージョンと互換性がありません。

スプリングカフカ2.0を使用する理由がありますか?これは、スプリング5.0ベースライン用に設計されています。

スプリングカフカ1.3.1.RELEASEを使用することを検討してください。 Apache Kafka 0.11 +と完全に互換性がありますが、同時に、前述のSpring Integration Kafka 2.3と互換性があります。

+0

ありがとうございました。バージョンに固執する理由はありません。 POCを行うので、最新のものを選んだだけです。だから私は春のカフカと春の統合カフカを1.3.1に変更しました。使用するのに最適なスプリングブートバージョンは何ですか? –

+0

いいえ、 'spring-integration-kafka'はまだ' 2.3.0.RELEASE'と同じです。彼らは絶対に異なる人工物です。春のブートも同じです - '1.5.8.RELEASE'。私が提案しているのは、あなたのSpring Kafkaを '1.3.1.RELEASE'に変更することだけです。それでおしまい。 –

+1

しかし、スプリングカフカは決してNPEを投げてはいけません。 GitHubの問題を開いてください。 –

関連する問題