一連のレコードがMongoにコミットされると、手動でオフセットをコミットするユーザーを作成しています。
Mongoエラーまたはその他のエラーが発生した場合、エラー処理コレクション に後で再生するためにレコードをパースしようとします。 Mongoがダウンしている場合、Kakfaからのコミットされていないオフセットからレコードを読み取ろうとする前に、消費者が一定期間処理を停止するようにします。
以下のサンプルは機能しますが、このシナリオのベストプラクティスは何ですか?Kafka 0.9 KafkaConsumerでオフセットを手動でコミットするときにメッセージを再消費する方法
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
消費者を再作成しないと、次のエラーが発生します。
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
ありがとうございました。それは私の消費者の二番目の問題を解決した。コンシューマを再作成する代わりにメッセージを再処理するために、consumer.seekToBeginning()を代わりに呼び出すことができます。 –
consumer.seekToBeginning(partitions)は、送信するすべてのパーティションの最初の位置にオフセットをリセットします。私はあなたのユースケースでどのように役立つのか分かりませんが、あなたがすべてのイベントを再処理しなければならない懇願にリセットした場合、 – Nautilus
最後のオフセットコミットからすべてのイベントを再処理します。この仮定は間違っていますか?私はMongoが再び利用可能になるまで、再処理しようとしています。これがなければ、投票は次のメッセージを消費するだけです。 –