2016-01-20 2 views
8

一連のレコードがMongoにコミットされると、手動でオフセットをコミットするユーザーを作成しています。
Mongoエラーまたはその他のエラーが発生した場合、エラー処理コレクション に後で再生するためにレコードをパースしようとします。 Mongoがダウンしている場合、Kakfaからのコミットされていないオフセットからレコードを読み取ろうとする前に、消費者が一定期間処理を停止するようにします。
以下のサンプルは機能しますが、このシナリオのベストプラクティスは何ですか?Kafka 0.9 KafkaConsumerでオフセットを手動でコミットするときにメッセージを再消費する方法

while (true) { 
    boolean commit = false; 
    try { 
     ConsumerRecords<K, V> records = consumer.poll(consumerTimeout); 
     kafkaMessageProcessor.processRecords(records); 
     commit = true; 
    } 
    catch (Exception e) { 
     logger.error("Unable to consume closing consumer and restarting", e); 
     try { 
      consumer.close(); 
     } 
     catch (Exception consumerCloseError) { 
      logger.error("Unable to close consumer", consumerCloseError); 
     } 
     logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e); 
     Thread.sleep(recoveryInterval); 
     consumer = createConsumer(properties); 
    } 
    if (commit) { 
     consumer.commitSync(); 
    } 

} 

private KafkaConsumer<K, V> createConsumer(Properties properties) { 
    KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties); 
    consumer.subscribe(topics); 
    return consumer; 
} 

消費者を再作成しないと、次のエラーが発生します。

o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead. 
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer 

答えて

5

あなたはオフセット犯していないとモンゴへの呼び出しは、あなたがちょうどあなたが必要だと思う時間を待って、ポーリングを再試行失敗した後、ときauto.commit.enableプロパティがfalseの場合()。

あなたが見ている問題は、新しいコンシューマがハートビートメカニズムとしてpoll()を使用することです。したがって、タイムアウトリクエストが長くなるのを待っていると、トピックのコーディネータは消費者を蹴飛ばすでしょう。それはグループを再バランスさせるでしょう。だからmongoを待つが、あなたはしばらくの間、それらをポーリングしたいかもしれない。

EDIT:回避策としては、

はそれが役に立てば幸い、このプロパティが高いrequest.timeout.msを置くことができます!

+0

ありがとうございました。それは私の消費者の二番目の問題を解決した。コンシューマを再作成する代わりにメッセージを再処理するために、consumer.seekToBeginning()を代わりに呼び出すことができます。 –

+0

consumer.seekToBeginning(partitions)は、送信するすべてのパーティションの最初の位置にオフセットをリセットします。私はあなたのユースケースでどのように役立つのか分かりませんが、あなたがすべてのイベントを再処理しなければならない懇願にリセットした場合、 – Nautilus

+0

最後のオフセットコミットからすべてのイベントを再処理します。この仮定は間違っていますか?私はMongoが再び利用可能になるまで、再処理しようとしています。これがなければ、投票は次のメッセージを消費するだけです。 –

1

私が理解しているように、(新しい)クライアントは、消費されたオフセットを保持するクライアントです。コミットはオフセットをサーバに送信しますが、クライアントからサーバに「そのオフセットで次のメッセージを渡す」と言うので、そのクライアントからの次のポーリングには影響しません。 オフセットはなぜサーバーに送信されますか?次の再バランスのために。したがって、サーバがコミットされたオフセットを使用する唯一の状況は、一部のクライアントが切断または切断された場合です。その後、パーティションが再調整され、この再バランスによってクライアントはサーバからオフセットを取得します。

したがって、オフセットをコミットせずにpoll()を呼び出すと、メッセージが再度読み取られることは期待できません。これには、クライアントのオフセットをロールバックする必要があります。私は試していないが、私は失敗したメッセージのオフセットにKafkaConsumer.seekを呼び出すことがそのトリックを行うべきだと思う。

https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

ところで、このようにあなたも、最後successfuly処理されたメッセージをコミットすることができますし、障害がいくつかのメッセージのために発生したときに、全体のレコードリストを繰り返す必要がないように、失敗した最初に求めますその真ん中に

6

ここに私のコードはクライアントバージョン0.10.0を使用しています。

あなたが要求しているようです。

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.OffsetAndMetadata; 
import org.apache.kafka.clients.consumer.OffsetCommitCallback; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MessageProcesser implements Runnable { 

    private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class); 

    private final ExecutorService pool = Executors.newFixedThreadPool(4); 

    private final KafkaConsumer<String, String> consumer; 

    private final String topic; 

    private final AtomicBoolean closed = new AtomicBoolean(false); 

    public MessageProcesser(String groupId, String topic, String kafkaServer) { 
     this.topic = topic; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", kafkaServer); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     props.put("enable.auto.commit", "false"); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 

      consumer.subscribe(Collections.singleton(topic)); 

      while (true) { 
       if (closed.get()) { 
        consumer.close(); 
       } 

       ConsumerRecords<String, String> records = consumer.poll(1000 * 60); 
       for (ConsumerRecord<String, String> record : records) { 

        String value = record.value(); 
        if (null == value) { 
         continue; 
        } 

        boolean processResult = false; 
        try { 
         Future<Object> f = pool.submit(new ProcessCommand(value)); 
         processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS); 
        } catch (Exception e) { 
         logger.error(e.getMessage(), e); 
        } 

        if (!processResult) { 
         //here if process fail, seek to current offset 
         consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()); 
        } else { 
         this.commitAsyncOffset(record); 
        } 
       } 
      } 
     } catch (Exception e) { 
      logger.error(e.getMessage(), e); 
      if (!closed.get()) { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) { 
        // ignore 
       } 
      } 
     } 
    } 

    public void shutdown() { 
     closed.set(true); 
    } 

    public void commitAsyncOffset(ConsumerRecord<String, String> record) { 
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); 
     offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); 

     consumer.commitAsync(offsets, new OffsetCommitCallback() { 
      @Override 
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { 
       if (e != null) { 
        logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace())); 
       } 
      } 
     }); 
    } 
} 
+0

先生、シークは必要ですか? –

+0

はい、シークはnesscessoryです。 Javaクライアントは現在のオフセットを覚えています。 – Hlex

+0

レコードが複数あり、エラーが発生した場合、このコードには問題があります。同じパーティションで後のレコードを処理すべきではありません。 – Hlex

関連する問題