Apache Kafka
にキューイングシステムを構築しました。アプリケーションは特定のKafka topic
へのメッセージを生成し、消費者側ではそのトピックに生成されたすべてのレコードを消費する必要があります。
新しいJava Consumer APIを使用してコンシューマを作成しました。 コードはここkafkaコンシューマ(新しいコンシューマAPI)を永久に実行しています
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
のように見える私は、任意のレコードが即座に消費され、処理すべきプロデューサーカフカのトピックの中に押し込まれるように永遠に消費者を実行する必要があります。
私の混乱は、サンプルコードのような無限のwhileループを使用してデータを消費する正しい方法ですか?