2017-06-26 8 views
1

私はカフカの初心者で、簡単なカフカの消費者/生産者の例をKafkaConsumerKafkaProducerで実行しています。コンシューマーが端末からコンシューマーを実行しているとき、コンシューマーはメッセージを受信して​​いますが、Javaコードを使用して聞くことはできません。 StackoverFlowでも同様の問題を検索しましたが(Links:Link1Link2)、その解決策を試しましたが、何も私にとってうまくいかないようです。 カフカバージョン:kafka_2.10-0.10.2.1および対応する依存関係がpomで使用されています。シンプルなカフカ消費者がメッセージを受け取っていない

生産者と消費者のためのJavaコード:

public class SimpleProducer { 
public static void main(String[] args) throws InterruptedException { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9094"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    Producer<String, String> producer = new KafkaProducer<>(props); 
    for(int i = 0; i < 10; i++) 
     producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i))); 

    producer.close(); 

}} 

public class SimpleConsumer { 

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9094"); 
    props.put("group.id", "test"); 
    props.put("zookeeper.connect", "localhost:2181"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("topic3", "topic2")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
    } 
}} 

出発カフカ:すべてのグループが使用して使用可能な bin/kafka-server-start.sh config/server.properties(私はすでにポートを設定して、プロパティでbrokeridファイル)

+0

端末から、カフカのコンシューマ/プロデューサツールを使用していると言うときは?たぶんあなたのJavaソースコードを投稿すると役に立つかもしれません。 – ppatierno

+0

コードが追加されました。そして、もし私がkafkaコンシューマーを端末から走らせているなら、私はJavaプロデューサーコードの上のメッセージを聞くことができます。 – mohiitb

+0

Javaプロデューサのログを有効にし、DEBUGレベルに設定して、プロデューサが何をしているのかを確認します。また、トピック名が正しいこと、およびコンシューマーの開始時設定が – PragmaticProgrammer

答えて

0

最初のチェック:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

次に、あなたのトピックは、CMDの下に使用して所属するグループを確認する:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe 

あなたのトピックと関連するグループ名を見つけたら(そうでないグループをデフォルトに属している場合だけで、あなたのグループでgroup.idを置き換える)、その後小道具以下で試してみて、それが動作するかどうか私に教えて:

props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test-consumer-group"); // default topic name 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 

    //Kafka Consumer subscribes list of topics here. 
    consumer.subscribe(Arrays.asList(topicName)); // replace you topic name 

    //print the topic name 

    java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics(); 
    System.out.println("list of topic size :" + listTopics.size()); 

    for(String topic : listTopics.keySet()){ 
     System.out.println("topic name :"+topic); 
    } 
+0

コンシューマが購読しているトピック(topic3) – mohiitb

+0

を含む、すべてのトピックを一覧表示して、すべてのトピックをJavaプログラムまたはcmdを使用してリストしていますか?あなたのトピックが**デフォルトのグループid **に属していることを意味するjavaプログラムを介している場合、これが今度は上記のpropを使用すると、トピックからデータを取得できるはずです。 – Sanjay

0

消費者がグループコーディネータに最初に登録するように、プロデューサを実行する前にコンシューマを実行します。その後、消費者がメッセージを消費するプロデューサを実行します。初めてコンシューマをグループコーディネータ消費者がメッセージを消費したオフセットがこれを使用するまで見つけるために注文するkafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181これは、消費者がトピックのどのオフセットを最後に消費したかを示します。

関連する問題