3

すでに実行中の別のカフカを購読するApache Kafkaコンシューマーをビルドします。今、私の問題は、私のプロデューサーがサーバーにメッセージをプッシュするとき、私の消費者はそれらを受け取らないということです。私はそのリンクからすべての情報を取得Apache Kafkaでコンシューマーがメッセージを受け取っていない

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

ここで私は、私はこの依存関係を使用する。ここ

 Properties properties = new Properties(); 
     properties.put("metadata.broker.list","Running kafka ip addr:9092"); 
     properties.put("serializer.class","kafka.serializer.StringEncoder"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
     String filePath="filepath"; 
     File rootFile= new File(filePath); 
     Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE); 
     for(File file : allFiles) { 
      StringBuilder sb = new StringBuilder(); 
      sb.append(file); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString()); 
      System.out.println("sending msg from producer.."+sb.toString()); 
      producer.send(message); 
     } 
      producer.close(); 

、消費者コード、

  properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("enable.auto.commit", "false"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 
     consumer.subscribe(Collections.singletonList(topicName)); 
     while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) 
       { 
        System.out.println("topic = "+record.topic()); 
        System.out.println("topic = "+record.partition()); 
        System.out.println("topic = "+record.offset()); 
       } 
       try { 
        consumer.commitSync(); 
       } catch (CommitFailedException e) { 
        System.out.printf("commit failed", e) ; 
       } 
      } 

をプロデューサーコードを与えます:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

コンシューマを実行しているとき、コンシューマ側から通知はありませんでした。私に何か考えてください。プロデューサーのために

+0

問題がどこにあるのかを理解しようとします。消費者またはプロデューサーのサイズです。このため:トピックのオフセットをチェックします。それはコマンドライン – Natalia

+0

から行うことができますあなたはクラスタ上のjarファイルとして実行していますか?あなたの動物園のポートを確認してください。 –

+0

@Natalia:プロデューサーを通じてメッセージを投稿できます。私はログのサイズと共に増加するメッセージ数を見ることができます。しかし、オフセットは増加していません... –

答えて

0

:私は推測

properties.put("metadata.broker.list","Running kafka ip addr:9092"); 

、これは "bootstrap.servers" でなければなりません。消費者のために

properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 

bootstrap.serversは、ブローカーに、ではないZKを指している必要があります。

「問題」とは、指定されたホスト/ポートにブローカがない場合には、ブローカを待機するだけで、失敗しないことです。

+0

同じIPでブローカーと飼い猫の両方を実行しています。その1つのノードのインストール。したがって、私は同じIPを与えた。私は別のvmsで飼い葉桶とブローカーを実行する必要がありますか? –

+0

別のサーバーで実行する必要はありませんが、お勧めします。とにかく、ZKとブローカは別のポートを使います、そして、 '2181'はZKのデフォルトポートです - したがって、あなたはブローカポート(デフォルト:' 9092')のポイントを必要としています。 –

+0

@ Matthias J. Sax - 消費者側のメッセージ –

0

私はカフカとJavaの初心者くさいんだけど、私はプロデューサーが実際に次のコマンド/usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginningを使用してトピックに書き込みをしていることを確認し、次のアプローチ

  • を提案したいでしょう。
  • そうであれば、おそらくコンシューマーコードに焦点を当てる必要があります。コンフルエントのガイドはかなり役に立ちます。
+0

、私はすでにチェックしました。プロデューサーの話題を完全に書いているが、コンシューマーも走っているがまだメッセージが出ない –

関連する問題