我々は、次のAPIを使用して複数のトピックをサブスクライブすることができます。 consumer.subscribe(は、Arrays.asList(topic1、topic2)、ConsumerRebalanceListenerのOBJ)
消費者は、トピックの情報を持っており、我々はconsumer.commitAsyncや消費者を使用してCOMITすることができます次のようにOffsetAndMetadataオブジェクトを作成して.commitSync()を呼び出します。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
私は知っていますが、オフセットはどのようにカフカによって維持されますか?また、単一の消費者グループを持つことで私の問題が解決されますか? – Apollo
オフセットはアプリケーションによってコミットされ、__consumer_offsetsという特別なオフセットカーフカのトピックに格納されます。オフセットは各トピックの各パーティションごとに保持されるので、購読しているトピックの数は関係ありません。 –