私たちはメッセージブローカーソリューションとしてKafkaを実装しようとしています。私たちはSpring BootマイクロサービスをIBM BLuemixに導入しています。内部メッセージブローカーの実装はKafkaバージョン0.10です。私の経験はJMS、ActiveMQの方が多いので、Javaのコンシューマのシステムレベルのエラーを処理するにはどうすればよいでしょうか?ここでApache Kafkaシステムエラー処理
は、我々が現在
消費者の特性
enable.auto.commit=false
auto.offset.reset=latest
我々は
消費者max.partition.fetch.bytes
session.timeout.ms
カフカのデフォルトのプロパティを使用している、それを実装している方法です
トピックごとに3つのスレッド、つまりすべてが同じgroupId(スレッドごとに1つのKafkaConsumerインスタンス)を持つようにスピンアップしています。現在のところ、パーティションは1つしかありません。消費者のコードは、スレッドクラス
kafkaConsumer = new KafkaConsumer<String, String>(properties);
final List<String> topicList = new ArrayList<String>();
topicList.add(properties.getTopic());
kafkaConsumer.subscribe(topicList, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
try {
logger.info("Partitions assigned, consumer seeking to end.");
for (final TopicPartition partition : partitions) {
final long position = kafkaConsumer.position(partition);
logger.info("current Position: " + position);
logger.info("Seeking to end...");
kafkaConsumer.seekToEnd(Arrays.asList(partition));
logger.info("Seek from the current position: " + kafkaConsumer.position(partition));
kafkaConsumer.seek(partition, position);
}
logger.info("Consumer can now begin consuming messages.");
} catch (final Exception e) {
logger.error("Consumer can now begin consuming messages.");
}
}
});
実際の読み取りを処理するサービスクラスがあるあなたはEventProcessorに気づくでしょう
try {
// Poll on the Kafka consumer every second.
final ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// Iterate through all the messages received and print their
// content.
for (final TopicPartition partition : records.partitions()) {
final List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
logger.info("consumer is alive and is processing "+ partitionRecords.size() +" records");
for (final ConsumerRecord<String, String> record : partitionRecords) {
logger.info("processing topic "+ record.topic()+" for key "+record.key()+" on offset "+ record.offset());
final Class<? extends Event> resourceClass = eventProcessors.getResourceClass();
final Object obj = converter.convertToObject(record.value(), resourceClass);
if (obj != null) {
logger.info("Event: " + obj + " acquired by " + Thread.currentThread().getName());
final CommsEvent event = resourceClass.cast(converter.convertToObject(record.value(), resourceClass));
final MessageResults results = eventProcessors.processEvent(event
);
if ("Success".equals(results.getStatus())) {
// commit the processed message which changes
// the offset
kafkaConsumer.commitSync();
logger.info("Message processed sucessfully");
} else {
kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
logger.error("Error processing message : {} with error : {},resetting offset to {} ", obj,results.getError().getMessage(),record.offset());
break;
}
}
}
}
// TODO add return
} catch (final Exception e) {
logger.error("Consumer has failed with exception: " + e, e);
shutdown();
}
スレッドのrunメソッドで起こるのコンストラクタで、このようになります。ほとんどの場合、各レコードはデータベース内のレコードをコミットします。プロセッサがエラー(System ExceptionまたはValidationException)をスローした場合、コミットせずにシークを設定して、後続のポーリングがそのグループIDのオフセットから返るようにします。
今疑いがあるのは、正しいアプローチですか?エラーが発生し、オフセットを設定すると、それが修正されるまで他のメッセージは処理されません。これは、DBに接続できないようなシステムエラーのために働くかもしれませんが、問題がそのイベントのみであり、他のレコードはこのレコードを処理できない場合、他のレコードは処理できません。 ErrorTopicという概念を考えました。エラーが発生すると、消費者はErrorTopicにそのイベントを公開し、その間に他の後続イベントを処理し続けます。しかし、私たちは以前の経験から、JMSの設計コンセプトをkafkaに取り込もうとしているようで、カフカのエラー処理を解決する良い方法があるかもしれません。また、エラートピックから再処理することで、いくつかのシナリオでは望ましくないメッセージの順序が変更される可能性があります。
Kafka基準に従ってプロジェクトでこのシナリオをどのように処理したか教えてください。
-Tatha
あなたはメッセージハブを使用しているか、Bluemixでカフカブローカーを自分で実装したのですか? – ValerieLampkin
@ValerieLampkinメッセージハブを使用しています – Tatha