2017-08-21 5 views
0

私のアプリケーション内のカフカトピックに複数のリスナーを設定することを検討しています。以下は私のセットアップです。両方のグループで消費されるはずですが、1つのリスナーだけが消費します。私はここで何が欠けていますか?スプリングカフカを使用している複数の消費者

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<String, Object>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); 

    return props; 
} 

@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs()); 
    return consumerFactory; 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("notificationFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("insertContainerFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 

    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory") 
public void receiveForNotification(String message) { 
    locationProcessor.processNotificationMessage(message); 
} 

編集:以下は、それぞれに異なるgroup.idが必要

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

答えて

0

を働いていたコードがあります。 groupプロパティはgroup.idではありません - javadocsを参照してください。次回リリースの1.3では、新しいgroupIdプロパティがあり、グループとしてidを使用することもできます。

以前のバージョンでは、それぞれ異なる消費者ファクトリが必要です。

+0

これは完全に機能しました。ありがとう。私は1.3リリースにアップグレードし、group.idを使用しました。 –

関連する問題