私はカフカの高レベル消費者です。カフカの消費者状態を確認する方法
public class KafkaHighLevelConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public KafkaHighLevelConsumer(int id,
String groupId,
List<String> topics,BlockingQueue<String> storyQueue) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
}finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
消費者は問題なく動作しますが、消費者の状態を監視する必要があります。 サーバのIPアドレスまたはポート番号が間違っていると、例外が発生します。
ポートを何か正しくないものに変更すると、props.put("bootstrap.servers", "localhost:9091");
からprops.put("bootstrap.servers", "localhost:100500");
に変わりますが、それでも例外はありません。
私はカフカに正常に接続しているかどうかを知りたいですか?そのようなケースを扱うことは可能ですか?
私は、このようなMavenを使うには、
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
感謝をDEPS!
kafka-consumer-groups.shを使用してトピックのコンシューマとその状態を表示することができます – MGolovanov
クライアントはブローカが停止している可能性が高いとみなすため、例外はありません。ブローカがオンラインになると、クライアントはブローカに接続します。 –