2017-05-12 6 views
0

私はKafkaの0.9バージョンを消費者データに使用しますが、プログラムは常にconsumer.poll(100)の行に実行されます。私はkafka-clients-0.9.0.0のjarファイルを使います。
カフカ:kafka_2.10-0.9.0.0
飼育係:飼育係-3.4.5kafka-0.9は消費者データではありません

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "10.28.176.11:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("tmp")); 
    try { 
     while (true) { 
      System.out.println("start comsuming..."); 
      ConsumerRecords<String, String> records = consumer.poll(100); 
      System.out.println("start print data......."); 
      for (ConsumerRecord<String, String> record : records) 
       System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
     } 
    } finally { 
     consumer.close(); 
    } 
} 

ログ:

enter image description here

+0

私はここで言及している方法(http://stackoverflow.com/questions/37770024/kafka-0-9-0-1-java-consumer-stuck-in-awaitmetadataupdate)を使用して、変数ADVERTISED_PORTとADVERTISED_HOSTを設定しますしかし、それはまだ消費者にこだわっています。 –

答えて

0

私は考え出しました。私は/ borkers/admin/comsumers/config/controller/controller_epoch/isr_change_notificationのように、zookeeperのすべてのkafkaメタデータを削除し、kafkaを再起動しました。間違いはなくなりました。私は多分カフカクラスターにいくつかの誤りがあると思う。

関連する問題