2017-09-19 13 views
2

同じトピックを持つ複数のkafkaサーバーから読み込む必要があるリスナーが、すべて1人の飼い主の下に構成されています。それらの複数のサーバーからどのように読み込むのですか。これで助けてくれますか?spring-kafkaを使用してリスナー用に複数のブートストラップサーバーを渡す方法

カフカサーバーの代わりに、私は飼い葉桶に向けることができますか?

答えて

3

@KafkaListenerKafkaListenerContainerFactory@Beanを必要とし、これは次にConsumerFactoryに基づいています。そしてDefaultKafkaConsumerFactoryは、消費者のconfigsのMap<String, Object>を受け入れる:

その ConsumerConfig.BOOTSTRAP_SERVERS_CONFIGは正確に標準のApacheカフカ bootstrap.servers propertyある
@Configuration 
@EnableKafka 
public class KafkaConfig { 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
         kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
           new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); 
     ... 
     return props; 
    } 
} 

https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation

確立するために使用するホスト/ポートのペアのリストカフカクラスターへの最初の接続。クライアントは、ブートストラッピングのためにここで指定されたサーバに関係なく、すべてのサーバを使用します。このリストは、フルセットのサーバを検出するために使用された最初のホストにのみ影響します。このリストは、host1:port1、host2:port2、...という形式にする必要があります。これらのサーバーは、(動的に変更される可能性のある)完全なクラスターメンバーシップを検出するための初期接続に使用されるだけなので、このリストにはフルセットただし、サーバーがダウンしている場合は、複数のサーバーが必要になる場合があります。

いいえ、あなたはZookeeperのアドレスを指すことはできません。それはもはやカフカによってサポートされていません。

+1

新しい(0.9+)クライアントの目的は、クライアントコードが飼い葉桶に話をしなくて済むようにすることです。しかし、ブローカーリストを取得し、ブートストラップサーバーの設定プロパティを設定するために、zookeeperに問い合わせることはできます(私は推測します)。 –

+0

ありがとう、Artem。また、カフカには、サーバとクライアント間のオフセット管理を扱う_consumer-offsetsというトピックがあります。ですから、私が複数の動物園に対応する複数のサーバーを使用すると、スプリング・カフカが特にオフセット管理を行います。 – daemon54

+1

スプリングカフカはオフセットを管理しません。すべてはカフカブローカーで行われます。 'consumer.group'のみを提供し、Consumerインスタンスに提供されたオフセットに依存します。 –

関連する問題