私はカフカのトピックからのメッセージをSpring Kafka consumerを使用して消費しようとしています。以下のエラーが表示されます。私のローカルマシンに設定されたカフカのトピックからのメッセージを消費するときにうまく動作します -o.apache.kafka.clients.NetworkClient - ブートストラップブローカー<hostname>:9092が切断されました
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.apache.kafka.clients.NetworkClient - ブートストラップブローカー<hostname>:9092
は、私は、コマンドラインに
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ctp_verbose_amcs --from-beginning --zookeeper localhost:2181
コード
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Value(value = "${kafka.bootstrapAddress:localhost:9092}")
private String bootstrapAddress;
@Value(value = "${groupId:amcs-tas}")
private String groupId;
@Bean
public ConsumerFactory<String, Map<String, Object>> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ConciseMessageDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
を使用してメッセージを読み取ることができています
を切断しますサーバーログで
エラーメッセージ
[2017-09-20 14:33:44,448] ERROR Closing socket for <hostname>:9092-10.251.127.31:51014 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
... 10 more
本当に「」が表示されますか?または、実際のホスト名を難読化しましたか?前者の場合、あなたの財産は価値がありません。後者の場合、おそらくネットワーク上の問題です。 –
はい、実際のホスト名が表示されます。私はこの投稿を作成中に削除しました。 –
サーバログを確認し、ネットワークトレースを確認します。 –