1

特定のトピックをリッスンしている消費者のメッセージをJSONとして処理しようとしています。私は春のドキュメントhereで与えられたアプローチに従ってみましたが、JSONとしてメッセージを取得できません。Spring-Kafka逆シリアル化

これは、レシーバの設定のために私のコードです:

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

@Value("${kafka.bootstrap.servers}") 
private String bootstrapServers; 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    // list of host:port pairs used for establishing the initial connections 
    // to the Kakfa cluster 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    // consumer groups allow a pool of processes to divide the work of 
    // consuming and processing records 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist"); 

    return props; 
} 

@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 

    return factory; 
} 

@Bean 
public Receiver receiver() { 
    return new Receiver(); 
} 

@Bean 
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
     new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 
} 

消費者:私は、リモートサーバー上で話題を公開しようとすると

public class Receiver { 

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); 

private CountDownLatch latch = new CountDownLatch(1); 

@KafkaListener(topics = "Reservation", 
     containerFactory = "kafkaJsonListenerContainerFactory") 
public void receiveMessage(Message<?> message) { 
    LOGGER.info("received message='{}'", message); 
    latch.countDown(); 
} 

public CountDownLatch getLatch() { 
    return latch; 
} 

} 

私は次のエラーを取得する:

 2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi) 
     org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81) 
      at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82) 
      at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975) 
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
      at java.lang.Thread.run(Thread.java:745) 
     Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
      at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) 
      at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
      at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
      at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880) 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78) 
      ... 11 common frames omitted 

しかし、リスナーからコンテナファクトリーを削除すると、メッセージを受信できますが、J SON形式が、文字列として:

2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{' 
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_eventType":"Reservation",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_operation":"create",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "type":"excursion",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "master":true,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partySize":2,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "startTime":"2017-01-27T08:30:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "endTime":"2017-01-27T10:00:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "variantSku":"ocean_polar_1606_FLL-640B",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "guestId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "createdBy":"149673",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "purchaser":"143679",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventId":"ocean_polar_1606_FLL-640",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "resourceId":"",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "surpriseFlag":false,' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "venueId":"FLL001",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "status":"CONFIRMED",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "primaryId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}' 

答えて

0

あなたmesssagesはJSONから変換するためにJSONドキュメント

received message='{' 
received message=' "_eventType":"Reservation",' 
received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
... 

の個々のスニペットあり、それは単一のメッセージにカプセル化する必要があります。

+0

これは、プロデューサー側の責任であることを意味します。ありがとう@Gary! – Kuber

関連する問題