私はカフカを使い始めました。私は消費者に小さな問題に直面しています。私はJavaで消費者を書いた。カフカJavaコンシューマーは既に閉店
この例外が発生します - IllegalStateExceptionこのコンシューマは既に閉じられています。
私は次の行に例外を取得:
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
私の消費者は、いくつかの例外を除いてクラッシュした後にこれが起こって始めたと私は再びそれを実行しているしようとしたとき、それは私に、この例外を与えました。ここで
は完全なコードです:
package StreamApplicationsTest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class StreamAppConsumer {
public static void main(String[] args){
int i = 0;
//List<String> topics = new ArrayList<>();
List<String> topics = Collections.singletonList("test_topic");
//topics.add("test_topic");
Properties consumerConfigurations = new Properties();
consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId");
Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations);
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator();
while(iterator.hasNext()){
i++;
ConsumerRecord<String,String> consumerRecord = iterator.next();
String key = consumerRecord.key();
String value = consumerRecord.value();
if(key=="exit" || value=="exit")
break;
System.out.println("Key="+key+"\tValue="+value);
}
System.out.println("Messages processed = "+Integer.toString(i));
consumer.close();
}
}
}
私は助けの任意の並べ替えが有用であろう、この問題で立ち往生しています。