私はKafka 0.9.0.1を使用しています。カフカ:トピックからの最初のメッセージを消費するときの断続的な遅さ
それは私が(別のconfigsで)異なるカフカブローカーを使用しましたトピックから「最新」のメッセージを取得するために、20〜30秒かかり、私は自分のアプリケーションを起動する初めて
まだ私はまだ、この動作を参照してください。通常、後続のメッセージには遅さはありません。
これが期待どおりの動作ですか?あなたは明らかに、このサンプルアプリケーションを実行し、独自の設定にブローカー/トピック名を変更することにより、以下、これを見ることができます
あなたが指定したコンシューマ・グループに新しい消費者を起動したときので、最初のメッセージは残りの部分よりも長く取る必要がありますpublic class KafkaProducerConsumerTest {
public static final String KAFKA_BROKERS = "...";
public static final String TOPIC = "...";
public static void main(String[] args) throws ExecutionException, InterruptedException {
new KafkaProducerConsumerTest().run();
}
public void run() throws ExecutionException, InterruptedException {
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
consumerProperties.setProperty("group.id", "Test");
consumerProperties.setProperty("auto.offset.reset", "latest");
consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC);
Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume());
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC);
kafkaProducer.publish("Test Message");
}
}
class MyKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
private KafkaConsumer<String, Object> kafkaConsumer;
public MyKafkaConsumer(Properties properties, String topic) {
kafkaConsumer = new KafkaConsumer<String, Object>(properties);
kafkaConsumer.subscribe(Lists.newArrayList(topic));
}
public void consume() {
while (true) {
logger.info("Started listening...");
ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE);
logger.info("Received records {}", consumerRecords.iterator().next().value());
}
}
}
class MyKafkaProducer {
private KafkaProducer<String, Object> kafkaProducer;
private String topic;
public MyKafkaProducer(Properties properties, String topic) {
this.kafkaProducer = new KafkaProducer<String, Object>(properties);
this.topic = topic;
}
public void publish(Object object) throws ExecutionException, InterruptedException {
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object);
Future<RecordMetadata> response = kafkaProducer.send(producerRecord);
response.get();
}
}
試していただきありがとうございます、私は断続的にその瞬間的な動作を見ましたが、あなたが数回それを試してくださいしていない場合は、遅延が表示されます。また、私はあなたの理論に感謝しますが、私はまた、 "リスニングを始める"の後に2回目のメッセージを公開しました。それでも20秒ほどかかります – DJ180