を追加しました。..カフカの消費者は、私はカフカサーバからのメッセージ(トピック)を消費するKafkaConsumerを使用してい
- それは消費者のコードを開始する前に作成したトピックのため正常に動作します...
しかし、問題は動的に作成されたトピック(コンシューマーコードが開始された後のことを言いたい)がAPIに動的トピックの作成をサポートすると言われている場合は機能しません。
使用カフカ版:0.9.0.1ここで
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
は... JAVAコードであるProperties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注:私のトピック名が.. を正規表現にマッチしていると私は消費者を再起動した場合、それがプッシュされたメッセージを読み始めますトピックに...
すべてのヘルプは本当にあなたが飼育係にフックすることができます...
urの返信と助けてくれてありがとう...基本的に私はこれを達成するためにKafkaConsumerのAPIを使いたいと思っていました。私はそれを自分で解決しました。 – siddu
これはどのように解決されましたか?私も同じ問題があります。 – madlad
@sidduこの問題をどのように解決したか教えていただけますか? – bhspencer