2017-07-13 9 views
0

n個のパーティションを持つトピックを提供しました。 SpringのKafkaConsumerリスナーが、同じ/ 1つのパーティションからの複数のメッセージを一度に聞く方法はありますか?単一のSpringのKafkaConsumerリスナーは、同じ/ 1つのパーティションからの複数のメッセージをリッスンできますか?

setBatchListener(true);を設定してConcurrentKafkaListenerContainerFactoryを試しましたが、コンシューマは1つではなく複数のパーティションから複数のメッセージを消費し始めました。提供

public class BatchReceiverConfig { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 

     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     // maximum records per batch receive 
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(2); 
     // enable batch listeners 
//  factory.setBatchListener(true); 

     return factory; 
    } 

    @Bean 
    public BatchReceiver receiver() { 
     return new BatchReceiver(); 
    } 
} 


/* Listener */ 
@KafkaListener(id = "batch-listener", topics = TOPIC_TEST_BATCH) 
    public void receive(List<String> data, 
         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, 
         @Header(KafkaHeaders.OFFSET) List<Long> offsets) { 

     LOGGER.info("start of batch receive"); 
     for (int i = 0; i < data.size(); i++) { 
      LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i), 
        partitions.get(i) + "-" + offsets.get(i)); 
      // handle message 

      latch.countDown(); 
     } 
     LOGGER.info("end of batch receive"); 
    } 
+0

質問を言い換えると、「1つのリスナが1つのパーティションからのメッセージのみをどのように消費できますか?」あれは正しいですか? – Arek

+0

それは正しいが、スプリングカフカモジュールの文脈である。 – Rites

答えて

0

は、グループ内の唯一の消費者が、あなたはそのConcurrentKafkaListenernへのconcurrencyLevelを設定することができますが存在します。

+0

これはうまくいかず、「max.poll.records」はフェッチするレコードの数を決めるプロパティだと思います。 – Rites

+0

Spring KafkaとKafkaのコンシューマそのものを設定する方法を教えてください。 – Arek

+0

質問にスニペットを追加しました。 – Rites

0

グループ管理を使用すると、Kafkaはコンシューマにパーティションを割り当てます。各コンシューマ(または同時実行性を使用するスレッド)は、コンシューマとパーティションの数に応じて、0,1、またはそれ以上のパーティションを取得します。

特定のコンシューマに1つのパーティションからのメッセージを取得させる場合は、カフカに割り当てを行わせる代わりに、パーティションを自分で割り当てる必要があります。

コンカレントコンテナを使用する場合は、並行処理と同じ数のパーティションを割り当てる必要があり、各スレッドは1つのパーティションしか取得できません。

+0

スニペットで問題が投稿されました。パーティション数=並行性があり、それでも動作しません。また、コンシューマ数=パーティション数。 – Rites

+0

グループ管理を使用している場合は、パーティションの配布に時間がかかります。当初、消費者はそれらをすべて得るでしょう。唯一の保証は、グループ管理を使用する代わりに、パーティションを自分で割り当てることです。 'props.put(ConsumerConfig.GROUP_ID_CONFIG、" batch4 ");' –

+0

クラスタ化された環境では、これを同じコードとして定義するのは難しいので、すべてのノードに展開されます。それを動的に設定する方法はありますか? – Rites

関連する問題