私はカフカを初めて利用しています。メッセージを送受信するためのコンシューマクラスとプロデューサクラスを実装しようとしました。ブローカのIPとポートのリストである両方のクラスに対して、bootstrap.servers
を,
で区切って設定する必要があります。例えば、 カフカのブローカーの接続文字列を取得するには
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
は、アプリケーションがクラスタのマスターノード上で実行されますので、それだけで
Kafka: Get broker host from ZooKeeperへの答えのよう
ZooKeeper
からブローカーの情報を取得することができるはずです。
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
しかしこのbrokerInfoは次のようになりJSON形式である: { "jmx_port": - 1、 "タイムスタンプ": "1428512949385"、 "ホスト": "192.168.0.11"、 "バージョン": 1、 "port":9093}
この記事では、次の方法で各ブローカの接続文字列を取得し、カンマで結合する方法を提案しました。
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
このBroker
クラスがorg.apache.kafka.common.requests.UpdateMetadataRequest.Broker
からのものである場合、それは方法createBroker
とconnectionString
を持っていません。
さらに別の類似のポストが見つかったGetting the list of Brokers Dynamically。しかし、それはhost
とport
のようなブローカー情報から属性を取得する方法を述べていません。私はおそらく、それらを抽出する文字列のようなjsonのパーサーを書くことができますが、私はそれを行うためにカフカのネイティブな方法があると思う。助言がありますか?
編集:Broker
クラスがkafka.cluster.Broker
であることを認識しました。それでもメソッドconnectionstring()
はありません。
、それは将来性だかどうか:最新のカフカの生産者と消費者のクライアントにはありませんもう動物園に話してください。この変更以来、ほとんどのユーザーはZooKeeperへのアクセスをセキュリティ強化のためにKafkaブローカー(サーバー)以外のものに制限していました。このようなシナリオでは、クライアントアプリケーションはZooKeeperからブローカ関連の情報を抽出することができません。なぜなら、ZooKeeperは最初にZooKeeperと話すことが許可されていないからです。 –
@migunoそうです。私はプロデューサーか消費者のどちらかで動物園の接続を設定する必要はありませんでした。だから、私はブローカーのipsとポートを得るために何をすべきだとお勧めしますか?私は要求(マスター/ブートストラップのip)からURLを取得し、 'service/kafka/v1/connection'エンドポイントへのGET呼び出しを行うことができました。 apiを呼び出さずにこれを行う別の方法があるのかどうか疑問に思うだけです。 – ddd