2017-03-01 15 views
1

私はカフカを初めて利用しています。メッセージを送受信するためのコンシューマクラスとプロデューサクラスを実装しようとしました。ブローカのIPとポートのリストである両方のクラスに対して、bootstrap.servers,で区切って設定する必要があります。例えば、 カフカのブローカーの接続文字列を取得するには

producerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 

は、アプリケーションがクラスタのマスターノード上で実行されますので、それだけで Kafka: Get broker host from ZooKeeperへの答えのよう ZooKeeperからブローカーの情報を取得することができるはずです。

public static void main(String[] args) throws Exception { 
    ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null); 
    List<String> ids = zk.getChildren("/brokers/ids", false); 
    for (String id : ids) { 
     String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null)); 
     System.out.println(id + ": " + brokerInfo); 
    } 
} 

しかしこのbrokerInfoは次のようになりJSON形式である: { "jmx_port": - 1、 "タイムスタンプ": "1428512949385"、 "ホスト": "192.168.0.11"、 "バージョン": 1、 "port":9093}

この記事では、次の方法で各ブローカの接続文字列を取得し、カンマで結合する方法を提案しました。

for (String id : ids) { 
     String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null)); 
     Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString); 
     if (broker != null) { 
      brokerList.add(broker.connectionString()); 
     } 
    } 

このBrokerクラスがorg.apache.kafka.common.requests.UpdateMetadataRequest.Brokerからのものである場合、それは方法createBrokerconnectionStringを持っていません。

さらに別の類似のポストが見つかったGetting the list of Brokers Dynamically。しかし、それはhostportのようなブローカー情報から属性を取得する方法を述べていません。私はおそらく、それらを抽出する文字列のようなjsonのパーサーを書くことができますが、私はそれを行うためにカフカのネイティブな方法があると思う。助言がありますか?

編集:Brokerクラスがkafka.cluster.Brokerであることを認識しました。それでもメソッドconnectionstring()はありません。

+0

、それは将来性だかどうか:最新のカフカの生産者と消費者のクライアントにはありませんもう動物園に話してください。この変更以来、ほとんどのユーザーはZooKeeperへのアクセスをセキュリティ強化のためにKafkaブローカー(サーバー)以外のものに制限していました。このようなシナリオでは、クライアントアプリケーションはZooKeeperからブローカ関連の情報を抽出することができません。なぜなら、ZooKeeperは最初にZooKeeperと話すことが許可されていないからです。 –

+0

@migunoそうです。私はプロデューサーか消費者のどちらかで動物園の接続を設定する必要はありませんでした。だから、私はブローカーのipsとポートを得るために何をすべきだとお勧めしますか?私は要求(マスター/ブートストラップのip)からURLを取得し、 'service/kafka/v1/connection'エンドポイントへのGET呼び出しを行うことができました。 apiを呼び出さずにこれを行う別の方法があるのか​​どうか疑問に思うだけです。 – ddd

答えて

1

あなたは以下のショーとして、クラスタ内のすべてのブローカーの情報を取得するためにZkUtilsを使用することができます。このアプローチに関する一般的な注意事項

ZkUtils zk = ZkUtils.apply("zkHost:2181", 6000, 6000, true); 
List<Broker> brokers = JavaConversions.seqAsJavaList(zk.getAllBrokersInCluster()); 
for (Broker broker : brokers) { 
    //assuming you do not enable security  
    System.out.println(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host()); 
} 
zk.close(); 
関連する問題