私はKafkaを初めて使っています.iはカフカでやり始めました。私は以下の問題に直面しています。 最初はプロデューサAPIを作成していますが、正常に動作していますが、コンシューマAPIメッセージは表示されません。KafkaコンシューマAPIが正常に動作していません
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
String topic = "Hello-Kafka";
String group = "myGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "XXX.XX.XX.XX:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try {
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("records ::" + records);
System.out.println(records.toString());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record::" + record.offset());
System.out.println(record.key());
System.out.println(record.value());
}
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
}
}
レスポンス::トピックこんにちは、カフカ 記録:: [email protected] 組織に加入
:
私のコードは次のようです。 [email protected]
ここオフセット、キーを印刷しないで、値 コントロールのために来ていない(ConsumerRecord記録:記録){ ループのためにそれは私を助けてください。
トピックにいくつかのメッセージを出しましたか? あなたのトピックのようにメッセージがありません。 – divyesh