2016-10-06 13 views
5

hereのドキュメントによると、に記載されているように、リスナーにメッセージを取得するためにPOCを試しています。Spring Integration Kafka消費者リスナーがメッセージを受信しない

@Configuration 
public class KafkaConsumerConfig { 

    public static final String TEST_TOPIC_ID = "record-stream"; 

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") 
    private String topic; 

    @Value("${kafka.address:localhost:9092}") 
    private String brokerAddress; 


    /* 
     @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
     KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> 
     kafkaMessageDrivenChannelAdapter = new 
     KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return 
     kafkaMessageDrivenChannelAdapter; } 

     @Bean public QueueChannel received() { return new QueueChannel(); } 
    */ 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(3); 
     factory.getContainerProperties().setPollTimeout(30000); 
     return factory; 

    } 

    /* 
    * @Bean public KafkaMessageListenerContainer<String, String> container() 
    * throws Exception { ContainerProperties properties = new 
    * ContainerProperties(this.topic); // set more properties return new 
    * KafkaMessageListenerContainer<>(consumerFactory(), properties); } 
    */ 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest 
                     // smallest 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

} 

とリスナーが

@Service 
public class Listener { 

    private Logger log = Logger.getLogger(Listener.class); 


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") 
    public void process(String message/* , Acknowledgment ack */) { 
     Gson gson = new Gson(); 
     Record record = gson.fromJson(message, Record.class); 

     log.info(record.getId() + " " + record.getName()); 
     // ack.acknowledge(); 
    } 

} 

、以下の通りである私は、同じトピックにメッセージを生産していますし、この消費者が同じトピックに取り組んでいるにもかかわらず、リスナーが実行されていません。

私はKafka 0.10.0.1を実行しています。現在は現在のpomです。このコンシューマは、多くのコマンドラインサンプルとは異なり、スプリングブートWebアプリケーションとして動作しています。

<dependencies> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-kafka</artifactId> 
      <version>2.1.0.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>com.google.code.gson</groupId> 
      <artifactId>gson</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

    </dependencies> 

は、私はそれが私が間違っているのされているもの、話題がメッセージを持っているときに、このリスナーが打たれていない理由を把握するために時間の良い量を費やしてきました。

私はチャンネルを使ってメッセージを受け取ることができることを知っています(私はそのコードの設定部分にコメントを付けました)が、ここでは並行処理はきれいです。

この種の実装は非同期メッセージ消費で可能ですか。

答えて

3

@EnableKafkaと一緒に@Configurationを追加する必要があります。

Will addすぐに説明があります。

一方:

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
+0

私は私のプロデューサーで、私はEnableKafkaを持っていない、EnableIntegrationはそのの世話をしていると思いました。私にこれを試してみましょう。 –

+0

'@ EnableIntegration'はSpring Integrationのためのものですが、ここではSpring Kafkaについて説明します。彼らは完全に異なる独立プロジェクトです。 '@ KafkaListener'はSpring Integration Kafkaの外にあります。 '@ EnableJms'、' @ EnableRabbit'など多くの '@Enable ...'があることを覚えておいてください。唯一の '@ EnableIntegration'はそれらのすべてを気にすることはできません。それはその責任ではありません。 –

+0

洞察力に感謝して、あなたの魅力のように働いています。これを覚えておいてください。 –

関連する問題