2017-05-12 9 views
1

トピックの最新のメッセージから始まるカフカ消費者を欲しいです。ここでカフカ消費者は最新のメッセージから始まっていません

は、Javaコードです:

private static Properties properties = new Properties(); 
private static KafkaConsumer<String, String> consumer; 
static 
{ 
    properties.setProperty("bootstrap.servers","localhost"); 
    properties.setProperty("enable.auto.commit", "true"); 
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    properties.setProperty("group.id", "test"); 
    properties.setProperty("auto.offset.reset", "latest"); 
    consumer = new KafkaConsumer<>(properties); 

    consumer.subscribe(Collections.singletonList("mytopic")); 
} 

@Override 
public StreamHandler call() throws Exception 
{ 
    while (true) 
    { 
     ConsumerRecords<String, String> consumerRecords = consumer.poll(200); 
     Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic"); 
     for(ConsumerRecord<String, String> rec : records) 
     { 
      System.out.println(rec.value()); 
     } 
    } 
} 

auto.offset.resetの値が最新ですが、消費者は2日前に属しているフォームのメッセージを開始し、それが追いついたが、最新のメッセージと一緒に。

私には何が欠けていますか?

答えて

3

は、あなたが同じgroup.idで前にこの同じコードを実行することがありますか? auto.offset.resetパラメータは、消費者用にすでに保存されている既存のオフセットがない場合にのみ使用されます。たとえば、2日前にその例を前に実行してからもう一度実行すると、最後に消費された位置から開始されます。

手動でトピックの最後に移動する場合は、seekToEnd()を使用してください。

これについてもう少し詳しくは、https://stackoverflow.com/a/32392174/1392894を参照してください。

+0

ありがとうございます、私はあなたが正しいと思います!私は2日前にそれを使用していましたが、最新のオフセットではgroup.idが消費された最新のオフセットを意味することに気づいていませんでした。 – Ehsan

関連する問題