0

マイクラスタ構成、クラスの詳細とジャーバージョンは、私が始めている質問org.apache.kafka.common.KafkaException: Failed to construct kafka consumerorg.springframework.messaging.MessageHandlingException:欠落しているメソッドのパラメータの型のヘッダー「kafka_receivedMessageKey」[クラスjava.lang.Integerの]

に記載されていますZookeeperサーバー、Kafkaサーバー、Kafka RESTサーバー次に、spring-kafka-webhook-service.warというファイル名のspring-boot warファイルをtomcatにデプロイします。

私はKafka RESTプロキシクライアントを通じてメッセージを投稿しているので、@KafkaListenerメソッドが受信メッセージConsumerRecordを読み取ることができないことを示唆する以下のエラーが表示されます。どんな入力も高く評価されます。

マイカフカ、残りのプロパティは、現在、以下のように設定されます。

confluent-3.3.0/etc/kafka-rest/kafka-rest.properties 

id=kafka-rest-test-server 
schema.registry.url=http://localhost:8081 
zookeeper.connect=localhost:2181 

エラーログを戦争の展開後にTomcat上

2017-12-26 09:11:01.143 ERROR 20430 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 3, CreateTime = 1514279460946, checksum = 1183108784, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27']) 

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 
Endpoint handler details: 
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent,java.lang.String,java.lang.Integer,int,java.lang.String)] 
Bean [[email protected]]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer], failedMessage=GenericMessage [payload=InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'], headers={kafka_offset=3, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}] 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:183) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] 
     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer] 
     at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] 
     at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na] 
     ... 8 common frames omitted 

答えて

1

のみPOJOを持つメソッドを使用して@KafkaListenerを使用します「InventoryEvent」をパラメータとして

InventoryEvent event 

代わりの

@Payload InventoryEvent event, 
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, 
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
@Header(KafkaHeaders.OFFSET) String offset 

SOプロデューサーは、春の統合カフカであればそれはそこに送られアルテムビランhttps://stackoverflow.com/a/32125453/786676

+1

で答えるこの中に指定されているようkafka_receivedMessageKeyがカフカ経由で送信されることはありませんので、これは問題を解決しました。 OTOH '@ Header'は不在ヘッダを無視する' required'オプションを持っています –

+0

それを得ました!ありがとう – somnathchakrabarti

関連する問題