0
私はspring-cloud-stream-schema
を使って、kafkaからのavroメッセージを読んでいます。私はMessagesChannels
で入力チャンネルを設定した:春の雲kafkaとavroのシリアル化の問題
@Input("topicName1")
SubscribableChannel fromInput1();
私はそのような設定ファイルを持っている:私はこのエラーを得た
@Configuration
@EnableBinding(MessagesChannels.class)
@EnableSchemaRegistryClient
public class MessageConfiguration {
@Bean
public MessageConverter topic1MessageConverter() throws IOException {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
そして、私の消費者は
私は実際に送信fromInput1().subscribe(this::onMessage);
void onMessage(Message message) {
}
メッセージと呼ばれている
:nested exception is java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to [B
実際に生のバイトは正しく解析されますorg.apache.avro.generic.GenericData$Record
。しかし、春はMessage
クラスが必要です。 GenericData$Record
をMessage
にキャストする方法、またはGenericData$Record
をavro-toolsクラスによって直接生成する方法
詳細:
2017-03-06 11:23:10.695 ERROR 19690 --- [afka-listener-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 7979, CreateTime = 1488784987569, checksum = 623709057, serialized key size = -1, serialized value size = 36, key = null, value = {"foor": "bar"})
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cl[email protected]4bf9d802]; nested exception is java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to [B
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)