2017-09-20 30 views
1

私はリスナークラスにConsumerSeekAwareを実装しています。しかし、オーバーライドされたメソッドregisterSeekCallback、onPartitionsAssigned、onIdleContainerは呼び出されません。私の消費者設定ファイルとコンシューマファイルは以下の通りです。 間違いを特定できませんでした。Spring Kafka -Consumerが呼び出さないメソッドを呼び出す

を使用したスプリングカフカ:1.1.6バージョン。

消費者:

package com.test.kafka.binder; 
    import java.util.Map; 
    import org.apache.kafka.common.TopicPartition; 
    import org.springframework.kafka.annotation.KafkaListener; 
    import org.springframework.kafka.listener.ConsumerSeekAware; 
    import org.springframework.stereotype.Component; 
    @Component 
    public class Consumer implements ConsumerSeekAware{ 

    @KafkaListener(topics = "${kafka.topic}", group = "foo") 
    public void listen(String message) { 
     System.out.println("Received Messasge in group foo: " + message); 
    } 

    @Override 
    public void registerSeekCallback(ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("registerSeekCallback"); 

    } 

    @Override 
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("onPartitionsAssigned"); 
    } 

    @Override 
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { 
     // TODO Auto-generated method stub 
     System.out.println("onIdleContainer"); 

    } 
} 

消費者設定:

package com.test.kafka.binder; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 

@EnableKafka 
@Configuration 
public class KafkaConsumerConfig { 


    @Value("${kafka.bootstrap-servers}") 
    private Object bootstrapAddress; 


    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress); 
     props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      "test"); 
     props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
     props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> 
     kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory 
      = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 
} 
+0

電源を入れます。あなたがそれを理解できなければ、ログをどこかに投稿してください。 –

+0

@GrayRussell spring-kafkaのデバッグログから見つけることができませんでした。ここで共有されているログ=> http://text-share.com/view/a46d764a – Hariharan

答えて

0

これは、それがマスターの上に固定されているknown problemです。 1.1.7をリリースする必要があります。

ただし、1.2.1にアップグレードできる場合は、すでに修正されています。

あなたもアイドルイベントを有効にする必要があります。注:デバッグログを

factory.getContainerProperties().setIdleEventInterval(10_000L); 
+0

私のアップデートもご覧ください。 –

関連する問題