常に聞くDatabase
のデータをプッシュするKafka topics
のコンシューマを作成する必要があります。Javaを使用して単一のサイクルでKafkaから複数のレコードを読み取る方法
ここでの要件は次のとおりです。 - カフカから複数のレコードを1つのサイクルで読み取る場合は、1回の呼び出しで複数回ではなくdbにプッシュしようとします。
public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
String bufferMemory) throws Exception {
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));
consumer.subscribe(Arrays.asList(topicName));
logger.info("subscibed to the topic {}", topicName);
cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
session = cluster.connect(KEYSPACE);
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
Model model= mapper.readValue(record.value(), Model.class);
try {
boolean flag = insertIntoDB(session, model);
if (flag) {
logger.info("************ Data Persisted Successfully ***************");
} else {
logger.info("******* Data Persition Failed *************");
}
} catch (Exception ex) {
logger.error("Exception while persisting data into DB", ex);
}
}
} catch (Exception ex) {
logger.error("Exception while reading data from kafka", ex);
}
}
} finally {
consumer.close();
}
}
だからあなたの本当の問題は、DBに複数のレコードを挿入する方法ですか? – GuangshengZuo
私は1レコードを読んでいて、その1レコードをDBに挿入できますが、 'Kafka'から複数のレコードを読み込む方法と、それらを一度の呼び出しでDBに挿入する方法はありますか? – Sat
kafkaからの読み方を示すコードを追加 – Natalia