2017-03-31 12 views
1

こんにちは、私は現在、Spring Kafkaでおしゃべりしており、単一のKafkaListenerContainerFactoryをリスナーに追加することに成功しました。今度は、複数のKafkaListenerContainerFactorysを追加したいと思います(jsonにメッセージを持つトピック用に1つ、文字列用にもう1つのトピック)。以下のコードを参照してください:複数のKafkaListenerContainerFactoriesを追加する際の問題

Description: 

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. 
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory' 


Action: 

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration. 

は私が間違って何をやっている:

@EnableKafka 
@Configuration 
public class KafkaConsumersConfig { 

    private final KafkaConfiguration kafkaConfiguration; 

    @Autowired 
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) { 
     this.kafkaConfiguration = kafkaConfiguration; 
    } 

    @Bean 
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(jsonConsumerFactory()); 
     factory.setConcurrency(3); 
     factory.setAutoStartup(true); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String,Record> jsonConsumerFactory(){ 
     JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class); 
     return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer); 
    } 

    @Bean 
    public Map<String,Object> jsonConsumerConfigs(){ 
     Map<String,Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress()); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId()); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit()); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval()); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout()); 
     return propsMap; 
    } 
    @Bean 
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(fileConsumerFactory()); 
     factory.setConcurrency(3); 
     factory.setAutoStartup(true); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String,String> fileConsumerFactory(){ 
     return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs()); 
    } 

    @Bean 
    public Map<String,Object> fileConsumerConfigs(){ 
     Map<String,Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress()); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId()); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit()); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval()); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout()); 
     return propsMap; 
    } 
} 

が、これは私に次のエラーを与える実行していますか?

答えて

4

あなたがSpring BootのKafka Auto Configurationに依存しないように見えます。

春ブーツはKafkaAutoConfigurationで提供します。

@Bean 
@ConditionalOnMissingBean(ConsumerFactory.class) 
public ConsumerFactory<?, ?> kafkaConsumerFactory() { 

あなたはjsonConsumerFactoryfileConsumerFactoryを持っているので、彼らが自動設定によって提供されるものをオーバーライドします。

しかし、その一方で、KafkaAnnotationDrivenConfigurationで、あなたの工場の非を適用することができる。

@Bean 
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") 
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
     ConcurrentKafkaListenerContainerFactoryConfigurer configurer, 
     ConsumerFactory<Object, Object> kafkaConsumerFactory) { 

あなたConsumerFactory豆はConsumerFactory<Object, Object>タイプではないので。

ので:

  • だけでアプリケーションのプロパティに以下を追加することによって、春のブート自動設定からKafkaAutoConfigurationを除外するには、ファイル: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • かでそれを上書きするkafkaListenerContainerFactoryにごKafkaListenerContainerFactory豆の名前を変更Boot
  • を入力するか、ConsumerFactory beanのいずれかをConsumerFactory<Object, Object>タイプにします。
関連する問題