同じトピック上の異なるJSONペイロード(例えば、フー、バー、カー...)春カフカ:私は同じカフカトピックに異なるJSONペイロードを送信する必要が
スプリングカフカに基づいて、親クラスを使用せずドキュメント、私はクラスレベルで@KafkaListener
を使用し、メソッドレベルで@KafkaHandler
を指定します(doc)
@KafkaListener(topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(Foo foo) {
...
}
@KafkaHandler
public void listen(Bar bar) {
...
}
@KafkaHandler
public void listen(Car car) {
...
}
}
が、私はこの例外を取得することができます:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"})
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147)
at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
は、私はまた、このアプローチを試してみましたが、働いていない:
@KafkaListener(topics = "myTopic")
public void receive(ConsumerRecord<String, ?> payload)
{
if (payload.value() instanceof Foo)
{
//
}
else if (payload.value() instanceof Car)
{
//
}
}
私は春のカフカを使用して、同じトピック上の異なるJSONペイロードを送信するために私の生産者と消費者を設定するにはどうすればよいですか?
JSONからどのように逆シリアル化しますか。 –