0

私はカフカ0.10.0とzookeeper 3.4.6をプロダクションサーバーに使用しています。それぞれ約50のパーティションで20のトピックを持ちます。私は100の消費者のそれぞれが異なる話題とパーティションに加入している。すべての消費者は同じgroupIdを持っている。消費者が特定のトピックに対して追加または削除された場合、異なるトピックに関連付けられた消費者もリバランスを実行するケースがありますか?カフカコンシューマーグループIDとコンシューマーリバランスの問題

マイ消費者のコードは次のとおりです。

public static void main(String[] args) { 
     String groupId = "prod" 
     String topicRegex = args[0] 
     String consumerTimeOut = "10000" 
     int n_threads = 1 
     if (args && args.size() > 1) { 
      ConfigLoader.init(args[1]) 
     } 
     else { 
      ConfigLoader.init('development') 
     } 
     if(args && args.size() > 2 && args[2].isInteger()){ 
      n_threads = (args[2]).toInteger() 
     } 

     ExecutorService executor = Executors.newFixedThreadPool(n_threads) 
     addShutdownHook(executor) 
     String zooKeeper = ConfigLoader.conf.zookeeper.hostName 
     List<Runnable> taskList = [] 
     for(int i = 0; i < n_threads; i++){ 
      KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut) 
      taskList.add(example) 
     } 
     taskList.each{ task -> 
      executor.submit(task) 
     } 
     executor.shutdown() 
     executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) 
    } 

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) { 

     Properties props = new Properties() 
     props.put("zookeeper.connect", a_zookeeper) 
     props.put("group.id", a_groupId) 
     props.put("zookeeper.session.timeout.ms", "10000") 
     props.put("rebalance.backoff.ms","10000") 
     props.put("zookeeper.sync.time.ms","200") 
     props.put("rebalance.max.retries","10") 
     props.put("enable.auto.commit", "false") 
     props.put("consumer.timeout.ms", consumerTimeOut) 
     props.put("auto.offset.reset", "smallest") 
     return new ConsumerConfig(props) 

    } 

public void run(String topicRegex) { 
     String threadName = Thread.currentThread().getName() 
     logger.info("{} [{}] main Starting", TAG, threadName) 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>() 
     List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1) 
     ConsumerConnector consumerConnector = consumer 

     for (final KafkaStream stream : streams) { 
      ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator() 
      List<Object> batchTypeObjList = [] 
      String topic 
      String topicObjectType 
      String method 
      String className 
      String deserialzer 
      Integer batchSize = 200 
      while (true){ 
       boolean hasNext = false 
       try { 
        hasNext = consumerIterator.hasNext() 
       } catch (InterruptedException interruptedException) { 
        //if (exception instanceof InterruptedException) { 
        logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage()) 
        throw interruptedException 
        //} else { 
       } catch(ConsumerTimeoutException timeoutException){ 
        logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage()) 
        topicListMap.each{ eachTopic, value -> 
         batchTypeObjList = topicListMap.get(eachTopic) 
         if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) { 
          def dbObject = topicConfigMap.get(eachTopic) 
          logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic) 
          className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
          method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
          int sleepTime = 0 
          if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null) 
           sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger() 
          executeMethod(className, method, batchTypeObjList) 
          batchTypeObjList.clear() 
          topicListMap.put(eachTopic,batchTypeObjList) 
          sleep(sleepTime) 
         } 
        } 
        consumer.commitOffsets() 
        continue 
       } catch(Exception exception){ 
        logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage()) 
        throw exception 
       } 
       if(hasNext) { 
        def consumerObj = consumerIterator.next() 
        logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition()) 
        topic = consumerObj.topic() 
        DBObject dbObject = topicConfigMap.get(topic) 
        logger.debug("{} [{}] topic name: {}", TAG, threadName, topic) 
        topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY) 
        deserialzer = KafkaConfig.DEFAULT_DESERIALIZER 
        if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){ 
         deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType) 
        } 
        className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
        method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
        boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY) 
        if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null) 
         batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) 
        else 
         batchSize = 1 
        Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message()) 
        int sleepTime = 0 
        if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null) 
         sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger() 
        if(isBatchJob == true){ 
         batchTypeObjList = topicListMap.get(topic) 
         batchTypeObjList.add(queueObj) 
         if(batchTypeObjList.size() == batchSize) { 
          executeMethod(className, method, batchTypeObjList) 
          batchTypeObjList.clear() 
          sleep(sleepTime) 
         } 
         topicListMap.put(topic,batchTypeObjList) 
        } else { 
         executeMethod(className, method, queueObj) 
         sleep(sleepTime) 
        } 
        consumer.commitOffsets() 
       } 
      } 
      logger.debug("{} [{}] Shutting Down Process ", TAG, threadName) 
     } 
    } 

任意の助けがにappriciatedされます。

+0

はい。すべてのコンシューマ・インスタンスが同じグループIDを共有する場合、コンシューマ・カウントの変更はグループのリバランスをトリガします。 – amethystic

答えて

2

コンシューマがコンシューマグループを離れるか、コンシューマグループに参加するたびに、グループ全体がリバランスを実行します。このグループは、メンバーが購読しているすべてのトピックのすべてのパーティションを思考しているので、問題のトピックに登録されていない消費者を再調整する可能性があります。

私は2つのトピックtest1(2パーティション)とtest2(9パーティション)を持つブローカーを持っていて、同じ消費者グループの両方で2人のコンシューマーを開始しています2つのトピックのうちの1つ。ご覧のとおり、consumer2がグループに参加すると、グループ全体が再調整されるため、consumer1はすべてのパーティションを取り消して再割り当てします。

Subscribing consumer1 to topic test1 
Starting thread for consumer1 
Polling consumer1 
consumer1 got 0 partitions revoked! 
consumer1 got 2 partitions assigned! 
Polling consumer1 
Polling consumer1 
Polling consumer1 
Subscribing consumer2 to topic test2 
Starting thread for consumer2 
Polling consumer2 
Polling consumer1 
consumer2 got 0 partitions revoked! 
Polling consumer1 
Polling consumer1 
consumer1 got 2 partitions revoked! 
consumer2 got 9 partitions assigned! 
consumer1 got 2 partitions assigned! 
Polling consumer2 
Polling consumer1 
Polling consumer2 
Polling consumer1 
Polling consumer2