2

を実行しているorg.apache.kafka.common.config.ConfigExceptionながら、私はコードそれは常に例外からカフカの消費者を実行しようとしたが、私はそれが正常に動作プロデューサーをチェックするkafka-console-consumer.shファイルを実行しましたブローカーが受け取ったすべてのメッセージを表示します。以下は、pom.xmlコードと例外ログです。親切に私が間違っている場所を教えてください。春の統合カフカは:消費者

public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:2181"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_coonfig"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.IntegerDeserializer"); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringDeserializer"); 
    return props; 
} 

ここは私のテストのクラスコードです。

@Test 
public void testSpringKafkaConsumer() throws InterruptedException { 

    try{ 
    String topics[] = { "programTopic3" }; 
    ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs); 
    factory.createConsumer(); 
    AbstractMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(factory, 
      topics); 
    container.setBeanName("container"); 

    final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>(); 
    container.setMessageListener(new MessageListener<Integer, String>() { 

     @Override 
     public void onMessage(ConsumerRecord<Integer, String> message) { 
      // logger.info("received: " + message); 
      System.out.println("received: --------+++++++++++++++------------" + message); 
      records.add(message); 
     } 
    }); 
    KafkaMessageDrivenChannelAdapter<Integer, String> adaptor = new KafkaMessageDrivenChannelAdapter<>(container); 

    adaptor.start(); 
    ConsumerRecord<Integer, String> poll = null; 
    while((poll =records.take()) != null){ 
     System.out.println(poll.topic() + " topic"); 
     System.out.println(poll.key() + " key"); 
     System.out.println(poll.value()+ " value"); 
    } 

    }catch(Exception exception) 
    { 
     exception.printStackTrace(); 
     Assert.fail(); 
    } 
} 

のpom.xml

<?xml version="1.0" encoding="UTF-8"?> 

http://maven.apache.org/xsd/maven-4.0.0.xsd "> 4.0.0

<groupId>com.learn.kafka.integrate.spring</groupId> 
<artifactId>SpringIntegrationKafka</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>SpringIntegrationKafka</name> 
<description>Demo project for Spring Integration kafka</description> 

<properties> 
    <springVersion>4.2.5.RELEASE</springVersion> 
    <springIntegrationVersion>4.2.5.RELEASE</springIntegrationVersion> 
    <mockitoVersion>1.10.19</mockitoVersion> 
</properties> 
<repositories> 
    <repository> 
     <id>repository.spring.milestone</id> 
     <name>Spring Milestone Repository</name> 
     <url>http://repo.spring.io/milestone</url> 
    </repository> 
</repositories> 
<dependencies> 
    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-api</artifactId> 
     <version>1.7.21</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-core</artifactId> 
     <version>${springIntegrationVersion}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-kafka</artifactId> 
     <version>2.0.0.M1</version> 
    </dependency> 
    <dependency> 
<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.10</artifactId> 
<version>0.9.0.1</version> 
</dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-test</artifactId> 
     <version>${springVersion}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework.integration</groupId> 
     <artifactId>spring-integration-test</artifactId> 
     <version>${springVersion}</version> 
    </dependency> 
</dependencies> 
<build> 
    <plugins> 
     <plugin> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.3</version> 
      <configuration> 
       <source>1.8</source> 
       <target>1.8</target> 
      </configuration> 
     </plugin> 
    </plugins> 
</build> 

例外ログ:

org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) 
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) 
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) 
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:46) 
at com.learn.kafka.integrate.spring.TestConsumer.testSpringKafkaConsumer(TestConsumer.java:83) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) 
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) 
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) 
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
+0

あなたは全体的な話 'ConsumerFactory <整数、文字列を表示されません> factory = new DefaultKafkaConsumerFactory <>(configs); '' configs'変数は 'consumerConfigs()'で作成されたプロパティを参照していないようです。 –

答えて

2
org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value. 

new DefaultKafkaConsumerFactory<>(configs);のように見えますが、consumerConfigs()は使用していません。他の側から

KafkaMessageDrivenChannelAdapterは、そのctorの中で、まさにこのことを行います。

this.messageListenerContainer = messageListenerContainer; 
this.messageListenerContainer.setAutoStartup(false); 
this.messageListenerContainer.setMessageListener(this.listener); 

だから、あなたのcontainer.setMessageListener(new MessageListener<Integer, String>() {に到達できません。したがって、recordsには何も表示されません。

私はあなたがまだそれを理解していない場合は、この特定のテストのために春の統合を避けるお勧めします。

KafkaMessageDrivenChannelAdapterバリアントの場合は、としてoutputChannelを指定して、pollという形式のメッセージを取得する必要があります。

しかし、また、あなたはKafkaMessageDrivenChannelAdapter周りよりBeanFactoryものを行う必要があります。

詳細は、当社のテストケースを参照してください。また、あまりにも、カフカ-0.9に基づいてサンプルアプリケーションに注意を払うhttps://github.com/spring-projects/spring-integration-kafka/blob/master/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

を:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

+0

貴重なフィードバックをありがとう。私はカフカのサーバポートでアドレスを持つ9092ポート番号を与えた場合、私はそれがうまく働いて同じコードを試してみましたが、私は飼育係ポート2181.を提供カフカの消費者のコンソールから実行するのであれば、私はこれで混乱しています。 – rahul

+0

はい、それは正しいです:http://stackoverflow.com/questions/34935596/zookeeper-usage-on-kafka-0-9-0 –

関連する問題