2017-03-06 8 views
0

私はspring-cloud-stream-schemaを使って、kafkaからのavroメッセージを読んでいます。私はMessagesChannelsで入力チャンネルを設定した:春の雲kafkaとavroのシリアル化の問題

@Input("topicName1") 
SubscribableChannel fromInput1(); 

私はそのような設定ファイルを持っている:私はこのエラーを得た

@Configuration 
@EnableBinding(MessagesChannels.class) 
@EnableSchemaRegistryClient 
public class MessageConfiguration { 
    @Bean 
    public MessageConverter topic1MessageConverter() throws IOException { 
    return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); 
    } 
} 

そして、私の消費者は

私は実際に送信
fromInput1().subscribe(this::onMessage); 

void onMessage(Message message) { 
} 

メッセージと呼ばれている

nested exception is java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to [B 

実際に生のバイトは正しく解析されますorg.apache.avro.generic.GenericData$Record。しかし、春はMessageクラスが必要です。 GenericData$RecordMessageにキャストする方法、またはGenericData$Recordをavro-toolsクラスによって直接生成する方法

詳細:

2017-03-06 11:23:10.695 ERROR 19690 --- [afka-listener-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 7979, CreateTime = 1488784987569, checksum = 623709057, serialized key size = -1, serialized value size = 36, key = null, value = {"foor": "bar"}) 

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cl[email protected]4bf9d802]; nested exception is java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to [B 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) 

答えて

1

私はあなたが指定したhere

として application/*+avroを使用するには、着信メッセージチャネルのための contentTypeを設定する必要があると思います
関連する問題