0
私はKafkaの0.9バージョンを消費者データに使用しますが、プログラムは常にconsumer.poll(100)の行に実行されます。私はkafka-clients-0.9.0.0のjarファイルを使います。
カフカ:kafka_2.10-0.9.0.0
飼育係:飼育係-3.4.5kafka-0.9は消費者データではありません
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.28.176.11:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("tmp"));
try {
while (true) {
System.out.println("start comsuming...");
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("start print data.......");
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
} finally {
consumer.close();
}
}
ログ:
私はここで言及している方法(http://stackoverflow.com/questions/37770024/kafka-0-9-0-1-java-consumer-stuck-in-awaitmetadataupdate)を使用して、変数ADVERTISED_PORTとADVERTISED_HOSTを設定しますしかし、それはまだ消費者にこだわっています。 –