2016-10-17 3 views
1

kafkaで接続された消費者のリストを取得するには?コンシューマーがブローカーに接続されているので、ZkClient/ZkUtilsのようなJavaユーティリティーがKafka 0.9.0.xの接続されたコンシューマーのリストを取得していますか?我々は、ユーティリティの下に使用してブローカーのリストを取得するために使用するように:特定の消費者がJavaを使用してカフカ0.9.0.xに接続しているかどうかをチェックする方法は?

 ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 60000); 

     if(zkClient!=null){ 
      List<String> brokerIds = zkClient.getChildren(ZkUtils.BrokerIdsPath()); 
      if(CollectionUtils.isNotEmpty(brokerIds) && brokerIds.contains(brokerId)){ 
       logger.debug("Broker:{{}} is connected to Zookeeper.",brokerId); 
       flag = true;  
      } 
      else{ 
       logger.error("ERROR:Broker:{{}} is not connected to Zookeeper.",brokerId); 
      } 
      zkClient.close(); 
     } 

私はMavenのからJavaのlibの下でカフカの0.9.0.xを使用しています:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

更新日:

私は 'kafka-console-consumer.bat'を開き、一度それを実行してからcmdプロンプトを越えました。それから、「zookeeper-shell.bat」とls /コンシューマと表示されましたが、プログラムされたコンシューマは表示されません。 zkClient.getChildren(ZkUtils.ConsumersPath())を使用して表示されるようになりました。

答えて

3

わからない。

このコードを使用するには、この依存関係をpomに追加します。その後

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

import org.apache.kafka.clients.CommonClientConfigs; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.TopicPartition; 
import kafka.admin.AdminClient; 
import kafka.coordinator.GroupOverview; 

Properties props = new Properties(); 
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092"); 
AdminClient adminClient = AdminClient.create(props); 

List<GroupOverview> groups = scala.collection.JavaConversions.seqAsJavaList(
     adminClient.listAllConsumerGroupsFlattened()); 
for (GroupOverview group : groups) { 
    String groupId = group.groupId(); 

    Properties consProps = new Properties(); 
    consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092"); 
    consProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
    consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
    consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
    consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer consumer = new KafkaConsumer(consProps); 

    List<AdminClient.ConsumerSummary> groupSummaries = scala.collection.JavaConversions.seqAsJavaList(
      adminClient.describeConsumerGroup(groupId)); 

    System.out.println("GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER"); 

    for (AdminClient.ConsumerSummary summary : groupSummaries) { 
     String owner = summary.clientId() + "_" + summary.clientHost(); 
     List<TopicPartition> topicPartitions = scala.collection.JavaConversions.seqAsJavaList(
       summary.assignment()); 
     for (TopicPartition tp : topicPartitions) { 

      // Get current offset 
      long currentOffset = consumer.committed(tp).offset(); 

      // get log end offset 
      consumer.assign(Arrays.asList(tp)); 
      consumer.seekToEnd(); 
      long logEndOffset = consumer.position(tp); 

      long lag = logEndOffset - currentOffset; 

      System.out.println(groupId + ", " + tp.topic() + ", " + tp.partition() + ", " + 
        currentOffset + ", " + logEndOffset + ", " + lag + ", " + owner); 
     } 
    } 
} 
+0

ありがとう、ちょうど私が実行消費者のリストを取得する必要があった。これは "AdminClient" + "listAllConsumerGroupsFlattened()"メソッドを使用して達成されました。ものはまだkafakに隠されています。 – usman

0

これはほとんど同じですが、ZkUtils.ConsumersPath(=/consumers)をチェックする必要があります。

Zookeeperのコンシューマ構造は、次の/ consumers/[idsId]/ids/[consumerId]であるため、各グループのグループとコンシューマを取得できます。 0.9.xの新しい消費者のために

+0

ZkUtils.ConsumersPath(/消費者)は、常に[]返されました。私は消費者団体の情報がカフカに保存されていると思います。私はすでに消費者のリストを確認するためにこの部分を通過しました。 – usman

+1

0.9.xと0.10.xでは依然としてコンシューマグループとコンシューマが存続します。 ZkUtils.getConsumersはConsumersPathの子を取得します。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L864 – gasparms

+0

zkClient.getChildren(ZkUtils.ConsumersPath())は空の[]を返します。 – usman

0

と、すべてのアクティブな消費者団体をリスト:

  1. は、すべてのブローカーを見つけると、ブローカーのそれぞれに「ListGroups」リクエストを送信し、すべてのグループ情報を取得します。あなたは$KAFKA_HOME/bin/kafka-consumer-groups.shkafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService.list())を参照することができます詳細について

0.9.xの新しい消費者のために、一定のコンシューマ・グループの詳細な情報について説明します。

  1. コンシューマ・グループ・コーディネータを見つけ、それに「DescribeGroups」リクエストを送信し、すべてのグループ・メンバー情報とパーティション割当て情報を取得します。
  2. 特定のパーティションの最後にコミットされたオフセットを取得するには、KafkaConsumer.committed(TopicPartitionパーティション)を呼び出します。詳細について

あなたは$KAFKA_HOME/bin/kafka-consumer-groups.shを参照することができます(kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService.describe()

古い消費者と新しい消費者がそのことについて完全に異なる実装を持っていることに注意してください。(どちらもロジックは、kafka.admin.ConsumerGroupCommandに実装されています。まさに情報あなたが必要とするが、私はkafka-consumer-groups.sh --describeと同じ情報を提供するサンプルプログラムをやったん

+0

この編集は間違いであり、自分の質問に追加する必要がありました。申し訳ありませんが、それを破棄する方法はわかりません。 – usman

+0

私は窓にJavaコードを使用していますが、Kafka Windowsには今何をすべきか 'kafka-consumer-groups.bat'はありません。 – usman

関連する問題