すでに実行中の別のカフカを購読するApache Kafkaコンシューマーをビルドします。今、私の問題は、私のプロデューサーがサーバーにメッセージをプッシュするとき、私の消費者はそれらを受け取らないということです。私はそのリンクからすべての情報を取得Apache Kafkaでコンシューマーがメッセージを受け取っていない
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
: ここで私は、私はこの依存関係を使用する。ここ
Properties properties = new Properties();
properties.put("metadata.broker.list","Running kafka ip addr:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
String filePath="filepath";
File rootFile= new File(filePath);
Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE);
for(File file : allFiles) {
StringBuilder sb = new StringBuilder();
sb.append(file);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString());
System.out.println("sending msg from producer.."+sb.toString());
producer.send(message);
}
producer.close();
、消費者コード、
properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
properties.put("group.id","test-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = "+record.topic());
System.out.println("topic = "+record.partition());
System.out.println("topic = "+record.offset());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e) ;
}
}
をプロデューサーコードを与えます:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
コンシューマを実行しているとき、コンシューマ側から通知はありませんでした。私に何か考えてください。プロデューサーのために
問題がどこにあるのかを理解しようとします。消費者またはプロデューサーのサイズです。このため:トピックのオフセットをチェックします。それはコマンドライン – Natalia
から行うことができますあなたはクラスタ上のjarファイルとして実行していますか?あなたの動物園のポートを確認してください。 –
@Natalia:プロデューサーを通じてメッセージを投稿できます。私はログのサイズと共に増加するメッセージ数を見ることができます。しかし、オフセットは増加していません... –