2017-09-20 6 views
0

私はカフカのトピックからのメッセージをSpring Kafka consumerを使用して消費しようとしています。以下のエラーが表示されます。私のローカルマシンに設定されたカフカのトピックからのメッセージを消費するときにうまく動作します -o.apache.kafka.clients.NetworkClient - ブートストラップブローカー<hostname>:9092が切断されました

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.apache.kafka.clients.NetworkClient - ブートストラップブローカー<hostname>:9092は、私は、コマンドラインに

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ctp_verbose_amcs --from-beginning --zookeeper localhost:2181 

コード

@EnableKafka 
@Configuration 
public class KafkaConsumerConfig { 

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class); 

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}") 
    private String bootstrapAddress; 

    @Value(value = "${groupId:amcs-tas}") 
    private String groupId; 

    @Bean 
    public ConsumerFactory<String, Map<String, Object>> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ConciseMessageDeserializer()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 
} 
を使用してメッセージを読み取ることができています

を切断しますサーバーログで

エラーメッセージ

[2017-09-20 14:33:44,448] ERROR Closing socket for <hostname>:9092-10.251.127.31:51014 because of error (kafka.network.Processor) 
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2 
     at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95) 
     at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87) 
     at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488) 
     at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
     at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483) 
     at kafka.network.Processor.run(SocketServer.scala:413) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2 
     at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31) 
     at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44) 
     at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60) 
     at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96) 
     at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48) 
     at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92) 
     ... 10 more 
+0

本当に「」が表示されますか?または、実際のホスト名を難読化しましたか?前者の場合、あなたの財産は価値がありません。後者の場合、おそらくネットワーク上の問題です。 –

+0

はい、実際のホスト名が表示されます。私はこの投稿を作成中に削除しました。 –

+0

サーバログを確認し、ネットワークトレースを確認します。 –

答えて

1

Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2

あなたのクライアントのバージョンは、ブローカーのバージョンと互換性がありません。

互換性マトリックスの下部にthe project pageを参照してください。

+0

ありがとうゲーリー、今すぐチェックします –

+0

私はこのバージョンのkafka - kafka_2.11-0.10.0.0を使用しています。互換性のあるクライアントのページリストには、spring-kafkaライブラリが用意されています。私は以下を使用しましたが、エラーは表示されませんが、メッセージはトピックから消費されていないようです。私が提供したカフカ版には、異なるバージョンのクライアントを使用する必要がありますか? \tコンパイルグループ: 'org.apache.kafka'、名前: 'kafka-clients'、バージョン: '0.10.2.0' コンパイルグループ: 'org.springframework.kafka'、名前: 'spring-kafka'、version: ' 1.2.2.RELEASE ' –

+0

10.2クライアントが10.0ブローカと互換性があるかどうかわかりません。そうでないように聞こえる。 Kafkaのドキュメントを確認してください。 –

関連する問題