2017-12-21 12 views
1

私は新しいコンシューマー(pythonコンシューマー)のリストを持っています。私は、このコマンドを使用してグループを取得することができます特定のカフカトピックに接続している(new-)コンシューマーを見る方法

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list 

私は話題彼らは私もすべてのグループ(好ましくはすべての消費者を得ることができますどのように

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group TheFoundGroupId 
  1. に接続されているために、各1のために取得することができますグループに参加していない場合)、トピックに接続していますか?
  2. これをシェルコマンドとして実行する以外に、Pythonからアクセスする方法はありますか?

答えて

0

それは最善の解決策ではないのですが、誰も答えを持っていると思わないので、私は最後(消費者にグループを割り当てて交換___YOURGROUP____)でそれを解決する方法を、ここにある:

import subprocess 
    import os 
    if "KAFKA_HOME" in os.environ: 
     kafkapath = os.environ["KAFKA_HOME"] 
    else: 
     kafkapath = oms_cfg.kafka_home 
     # error("Please set up $KAFKA_HOME environment variable") 
     # exit(-1) 

    instances = [] 
    # cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server {} --list'.format(oms_cfg.bootstrap_servers) 
    # result = subprocess.run(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    igr = ____YOURGROUP_____ # or run over all groups from the commented out command 
    print("Checking topics of consumer group {}".format(igr)) 
    topic_cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server ' + oms_cfg.bootstrap_servers + ' --describe --group {gr}' 
    result = subprocess.run(topic_cmd.format(gr=igr).split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    table = result.stdout.split(b'\n') 
    # You could add a loop over topic here 
    for iline in table[1:]: 
     iline = iline.split() 
     if not len(iline): 
      continue 
     topic = iline[0] 
     # we could check here for the topic. multiple consumers in same group -> only one will connect to each topic 
     # if topic != oms_cfg.topic_in: 
     #  continue 
     client = iline[-1] 
     instances.append(tuple([client, topic])) 
     # print("Client {} Topic {} is fine".format(client, topic)) 
    if len(instances): 
     error("Cannot start. There are currently {} instances running. Client/topic {}".format(len(instances), 
                            instances)) 
     exit(-1) 
1

ありがとうこの質問をする。

コンシューマグループIDのようなすべてのコンシューマ構成。コンシューマグループIDは、どのトピックにサブスクライブされているかは、zookeeperに格納されています。その後、2181

LS /消費者

を実行します。

./bin/zookeeper-shell localhostとのZooKeeperに接続を取得するためのコマンド以下

ラン

あなたはすべての消費者グループを取得します現在存在する。消費者団体も提供していない場合。 Kafkaはランダムな消費者グループを割り当てます。コンソール消費者のために、それはあなたが

Pythonのスニペット下からのすべての消費者団体が接続されている消費者や消費者グループを取得するには zookeeper python client

from kazoo.client import KazooClient 

zk = KazooClient(hosts='127.0.0.1:2181') 
zk.start() 


# get all consumer groups 
consumer_groups = zk.get_children("/consumers") 
print("There are %s consumer group(s) with names %s" % (len(consumer_groups), consumer_groups)) 

# get all consumers in group 
for consumer_group in consumer_groups: 
    consumers = zk.get_children("/consumers/"+consumer_group) 
    print("There are %s consumers in %s consumer group. consumer are : %s" % (len(consumers), consumer_group, consumers)) 

をインストール得ることができますコンソール - 消費者-XXXXXのID

を割り当てますトピック。

取得/消費者/ consumergroup_id/IDS/consumer_id/

サブスクリプションの下

{"version":1,"subscription":{"test":1},"pattern":"white_list","timestamp":"1514218381246"} 

ようにあなたの出力を与える消費者が加入し、すべてのトピックをオブジェクト。あなたのユースケースに応じてロジックを実装してください

ありがとう

+0

これは残念ながら "新しいスタイル"のコンシューマでは機能しません。リストは空です。 –

+0

このリストは空ですか?消費者または消費者団体? @コリン-SBI – shakeel

関連する問題