2017-04-22 13 views
0

私はKafkaを初めて使っています.iはカフカでやり始めました。私は以下の問題に直面しています。 最初はプロデューサAPIを作成していますが、正常に動作していますが、コンシューマAPIメッセージは表示されません。KafkaコンシューマAPIが正常に動作していません

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 


public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "XXX.XX.XX.XX:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 


      ConsumerRecords<String, String> records = consumer.poll(100); 

      System.out.println("records ::" + records); 
      System.out.println(records.toString()); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.println("Record::" + record.offset()); 
       System.out.println(record.key()); 
       System.out.println(record.value()); 
      } 
      consumer.commitSync(); 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 

レスポンス::トピックこんにちは、カフカ 記録:: [email protected] 組織に加入

私のコードは次のようです。 [email protected]

ここ

オフセット、キーを印刷しないで、値 コントロールのために来ていない(ConsumerRecord記録:記録){ ループのためにそれは私を助けてください。

+0

トピックにいくつかのメッセージを出しましたか? あなたのトピックのようにメッセージがありません。 – divyesh

答えて

0

空のレコードを印刷しようとしているため、コードにrecords.toString()だけが印刷されます。これは本質的にクラスの名前です。
私はあなたのコードにいくつかの変更を加えて動作させました。これが役立つかどうか見てください。

public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "xx.xx.xx.xx:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 

      while(true){ 
       ConsumerRecords<String, String> records = consumer.poll(1000); 
       if(records.isEmpty()){ 

       } 
       else{ 
       System.out.println("records ::" + records); 
       System.out.println(records.toString()); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println("Record::" + record.offset()); 
        System.out.println(record.key()); 
        System.out.println(record.value()); 
       } 
       consumer.commitSync(); 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 
+0

多くのありがとうございました。それは働いています。前に私は小さな間違いをしました。 – Narasimha

関連する問題