トピックの最新のメッセージから始まるカフカ消費者を欲しいです。ここでカフカ消費者は最新のメッセージから始まっていません
は、Javaコードです:
private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
properties.setProperty("bootstrap.servers","localhost");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("mytopic"));
}
@Override
public StreamHandler call() throws Exception
{
while (true)
{
ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
for(ConsumerRecord<String, String> rec : records)
{
System.out.println(rec.value());
}
}
}
auto.offset.resetの値が最新ですが、消費者は2日前に属しているフォームのメッセージを開始し、それが追いついたが、最新のメッセージと一緒に。
私には何が欠けていますか?
ありがとうございます、私はあなたが正しいと思います!私は2日前にそれを使用していましたが、最新のオフセットではgroup.idが消費された最新のオフセットを意味することに気づいていませんでした。 – Ehsan