2016-06-20 9 views
15

生産と消費の仕事を開始する前にkafkaサーバーが稼動しているかどうかを確認したい。それは常に真であるので、この場合if (server != null)Kafka Serverが動作しているかどうかを確認するには?

Properties kafka = new Properties(); 
kafka.setProperty("broker.id", "1"); 
kafka.setProperty("port", "9092"); 
kafka.setProperty("log.dirs", "D://workspace//"); 
kafka.setProperty("zookeeper.connect", "localhost:2181");  
Option<String> option = Option.empty(); 
KafkaConfig config = new KafkaConfig(kafka);   
KafkaServer server = new KafkaServer(config, new CurrentTime(), option); 
server.startup(); 

が十分ではありません... Windows環境であり、ここで日食の私のカフカサーバーのコードです。それで、私のカフカサーバーが稼動していてプロデューサーの準備ができていることを知る方法はありますか?開始データパケットの一部が失われるため、これを確認する必要があります。

ありがとうございました。

+0

をAdminClientを使用することですか? – Maroun

+0

私は自分の質問を更新しました。 – Khan

答えて

12

すべてカフカのブローカーがbroker.idを割り当てる必要があります:あなたは サーバーが 〜/カフカ/ kafka.logに次のメッセージが表示されたときに正常に起動されたことを確認することができます。起動時に、ブローカは、Zookeeperの一時ノードを/broker/ids/$idというパスで作成します。ノードは一時的であるため、ブローカが切断するとすぐに削除されます。シャットダウンする。

あなたはそうのようにはかないブローカーノードのリストを表示することができます:ZooKeeperのクライアントインタフェースはコマンドの数を公開し

echo dump | nc localhost 2181 | grep brokers

dumpには、クラスタのすべてのセッションおよび一時ノードがリストされます。あなたはlocalhost上のデフォルトポート(2181)上でのZooKeeperを実行している

  • 、およびlocalhostこと
  • あなたzookeeper.connectカフカの設定がないクラスタのためのリーダーです:

    注、上記を前提としていそれだけでhost:portなくhost:port/path

0

Javaからだつまり、あなたが簡単に電話をかけることができますカフカクラスタ用のchrootのenvを指定します

[[email protected] hsperfdata_kafka]$ /usr/bin/kafka status 
Kafka is running with PID=17850. 

try-catchの中で囲みます。 kafkaで問題が発生した場合、例外ブロックが実行されます。 あなたのJavaプログラム上のものをクリーンアップするには、「finally」を使用します。

私は

+0

ここでサンプルを共有できますか –

1

パウロの答えは非常に良いです、それはカフカ& Zkを、ビューのブローカーの観点からどのように連携するかを実際にコードサンプルを共有しようとします。

私は)(カフカサーバーが実行されているかどうかを確認するには、別の簡単なオプションがclusteを指して、単純なKafkaConsumerを作成し、いくつかのアクションを試してみてください、例えば、listTopicsすることであると言うでしょう。kafkaサーバーが実行されていない場合は、TimeoutExceptionが表示され、try-catch文を使用できます。

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) 
    props.put("group.id", kafkaParams.get("group.id").get.toString) 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    val simpleConsumer = new KafkaConsumer[String, String](props) 
    simpleConsumer.listTopics() 
    } 
1

良いオプションは、あなたがどのようなOSを使用しているメッセージを生成したり消費し始める前に、以下のよう

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;   
try (AdminClient client = AdminClient.create(properties)) { 
      client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get(); 
     } catch (ExecutionException ex) { 
      LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS); 
      return; 
     } 
関連する問題