2017-10-24 13 views
0

同じトピック上の異なるJSONペイロード(例えば、フー、バー、カー...)春カフカ:私は同じカフカトピックに異なるJSONペイロードを送信する必要が

スプリングカフカに基づいて、親クラスを使用せずドキュメント、私はクラスレベルで@KafkaListenerを使用し、メソッドレベルで@KafkaHandlerを指定します(doc

@KafkaListener(topics = "myTopic") 
static class MultiListenerBean { 

    @KafkaHandler 
    public void listen(Foo foo) { 
     ... 
    } 

    @KafkaHandler 
    public void listen(Bar bar) { 
     ... 
    } 

    @KafkaHandler 
    public void listen(Car car) { 
     ... 
    } 
} 

が、私はこの例外を取得することができます:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"}) 
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap 
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92) 
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147) 
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60) 
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) 
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) 
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.lang.Thread.run(Thread.java:745) 

は、私はまた、このアプローチを試してみましたが、働いていない:

@KafkaListener(topics = "myTopic") 
    public void receive(ConsumerRecord<String, ?> payload) 
    { 
     if (payload.value() instanceof Foo) 
     { 
     // 
     } 
     else if (payload.value() instanceof Car) 
     { 
     // 
     } 
    } 

私は春のカフカを使用して、同じトピック上の異なるJSONペイロードを送信するために私の生産者と消費者を設定するにはどうすればよいですか?

+0

JSONからどのように逆シリアル化しますか。 –

答えて

0

JSONメソッドとマルチメソッドリスナーを使用する場合、catch-22があります。適切な方法にルーティングするための型を知る必要があります。メソッドのシグネチャからの変換の型を推測することはできません。なぜなら、どのメソッドを呼び出すかわからないからです。

データから、またはHeader(カフカ11)を使用して、タイプを把握できるメッセージコンバーターが必要です。

次に、コンバータが適切な型に変換された後、呼び出すメソッドを特定できます。

ConsumerRecordには、未処理のJSONが含まれています。変換を行うにはObjectMapperを使用する必要がありますが、変換するタイプについてはヒントが必要です(または無差別に成功するまで)。

関連する問題