0
消費者がcomitAsyncをログの最初の2つのオフセットにすることができない奇妙な問題に直面しました。 ..私は引用誰か.Can消費者によって受信され、succesfulyコミットプロデューサーの同じ非同期送信で他のメッセージがこの問題の原因を見つけるので、それは非常に奇妙である私の次のコードと出力例commitAsyncが最初の2つのオフセットをコミットできないのはなぜですか?
package com.panos.example;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class Consumer extends ShutdownableThread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
super("KafkaConsumerExample", false);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.75:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(props);
this.topic = topic;
}
@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
try {
ConsumerRecords<Integer, String> records = consumer.poll(1000);
long startTime = System.currentTimeMillis();
if (!records.isEmpty()) {
System.out.println("C : {} Total No. of records received : {}" + records.count());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
consumer.commitAsync(new ConsumerCallBack(startTime,record.value(), record.offset()));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String name() {
return null;
}
@Override
public boolean isInterruptible() {
return false;
}
class ConsumerCallBack implements OffsetCommitCallback {
private final long startTime;
private String message;
private final String NewLine = System.getProperty("line.separator");
private long offset;
public ConsumerCallBack(long startTime) {
this.startTime = startTime;
}
public ConsumerCallBack(long startTime, String message, long offset) {
this.startTime = startTime;
this.message=message;
this.offset = offset;
}
public void onComplete(Map<TopicPartition, OffsetAndMetadata> CurrentOffset,
Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (exception != null) {
System.out.println("Message : {" + message + "}, committed successfully at offset " + offset +
CurrentOffset + "elapsed time :" + elapsedTime);
} else {
System.out.println(exception.toString());
/* JOptionPane.showMessageDialog(new Frame(),
"Something Goes Wrong with the Server Please Try again Later.",
"Inane error",
JOptionPane.ERROR_MESSAGE);*/
}
}
}
}
あなたはすべてのメッセージが正常にコミットされたことを見ることができます。なぜこれが起こるのですか?
Received message: (1, Message_1) at offset 160
Received message: (2, Message_2) at offset 161
Received message: (3, Message_3) at offset 162
Received message: (4, Message_4) at offset 163
Message : {Message_3}, committed successfully at offset 162{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6
Message : {Message_4}, committed successfully at offset 163{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6