2017-04-19 11 views
4

私がenable.auto.commit=falseを持っていて、consumer.commitAsync()と呼ぶことなくconsumer.poll()を呼び出すと、なぜconsumer.poll()は次回の呼び出し時に 新しいレコードを返しますか?Consumer.poll()はオフセットをコミットせずに新しいレコードを返しますか?

私はオフセットをコミットしなかったので、poll()は同じレコードであるはずの最新のオフセットを返します。

処理中に障害シナリオを処理しようとしているため、私は尋ねています。私はオフセットをコミットせずに、poll()が再び同じレコードを返すことを望んでいたので、それらの失敗したレコードを再度処理することができます。

public class MyConsumer implements Runnable { 
    @Override 
    public void run() { 
     while (true) { 
      ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE); 
      for (ConsumerRecord record : records) { 
       try { 
        //process record 
        consumer.commitAsync(); 
       } catch (Exception e) { 
       } 
       /** 
       If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
       **/ 
      } 

     } 
    } 
} 

答えて

3

ポーリングの開始オフセットは、ブローカーによってではなく、消費者によって決定されます。消費者は、最後に受信したオフセットを追跡し、次のポーリング中に次の一連のメッセージを要求する。

コンシューマが停止または失敗し、最後に消費されたオフセットを認識していない別のインスタンスがパーティションの消費をピックアップすると、オフセットコミットが再生されます。

KafkaConsumerにはかなり読める価値のあるかなり広範なJavadocがあります。

+1

意味があります。しかし、[poll()doc](https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long))には、「最後に消費されたオフセットは手動でseek(TopicPartition、long)で設定するか、購読済みパーティションのリストの最後にコミットされたオフセットとして自動的に設定されます。 最後のコミットされたオフセットに消費されたオフセットを設定する、新しいオフセットをコミットしないと 'poll()'が新しいレコードを返さないようにする必要があります。私の理解は正しいのですか? – Glide

+1

私はドキュメンテーションの部分は消費の出発点/オフセットを指していると思います。だからあなたはどちらかを開始またはコミットされたオフセットを使用してどこからでも起動します。 – ftr

+1

'poll()'が最初に呼び出されたときにのみコミットされたオフセットを使用することを意味しますか? – Glide

関連する問題