0

私はspring-boot 1.4.2とrabbitMQ 1.6.5.RELEASE(spring-boot-starter-rabbitを使用していません)を使用しています。私は私のプロジェクトで複数のマイクロサービスを持っており、ほとんどのマイクロサービスにはrabbitMQの消費者が含まれています。プロジェクトの1つがメッセージを生成します。メッセージがqueue.Ifで利用可能であるにもかかわらず、キューからメッセージを受け取ることを止める消費者はほとんどありません。特定のコンシューマを再起動すると、そのメッセージを消費し始めます。コンシューマー・マイクロ・サービスの誰かを再配備すると、他のコンポーネントの消費者が再びキューからメッセージを消費することはなくなりますが、キューではメッセージが表示されます。SpringブートrabbitMQコンシューマがjson変換エラーのためにメッセージを受け取るのを停止します

ログに見た後、私は以下の問題に

com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(com.vodafone.smartlife.provisioning.common.model.Message) 
throws com.sample.global.bookStateUpdate.consumer.BookFailConsumer.exception.ConsumerException' threw exception 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:138) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:105) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:778) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:701) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1213) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:682) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1175) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1338)\n\tat java.lang.Thread.run(Thread.java:745)\n 
Caused by: java.lang.NullPointerException: null 
com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(BookFailConsumer.java:54) 
sun.reflect.GeneratedMethodAccessor149.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498)org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) 
    org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) 
org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:125)... 12 common frames omitted\n 

を見つけることができますので、ジャクソンがJSONに私のメッセージを変換することができませんでしたようです。実際のオブジェクトを消費するのではなく、私の消費者では、私はorg.springframework.amqp.core.Messageを消費しています。それを手動で、私のカスタムオブジェクトに変換しています。

なぜ私は春のウサギがメッセージをjsonに変換できなかったのでしょうか?

見つけてください以下の構成&消費者のファイルの変更

package com.sample.global.rabbit.configuration; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.test.RabbitListenerTest; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.Primary; 

@Configuration 
@EnableRabbit 
@RabbitListenerTest(capture = true, spy = true) 
public class RabbitMqConfiguration { 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrentConsumers(15); 
    factory.setMaxConcurrentConsumers(15); 
    factory.setMessageConverter(jsonMessageConverter()); 

    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory() 
{ 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://localhost:15672"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setRequestedHeartBeat(10); 
    return connectionFactory; 
} 

@Bean 
public MessageConverter jsonMessageConverter() 
{ 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean(name = "MainTemplate") 
@Primary 
public RabbitTemplate rabbitTemplate() 
{ 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

} 

消費者

package com.sample.global.rabbit.consumer; 

import org.springframework.amqp.core.Message; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.rabbit.annotation.Exchange; 
import org.springframework.amqp.rabbit.annotation.Queue; 
import org.springframework.amqp.rabbit.annotation.QueueBinding; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.stereotype.Component; 

@Component 
public class BookFailConsumer { 

private static final Logger logger = LoggerFactory.getLogger(BookFailConsumer.class); 

private static final ObjectMapper objectMapper = new ObjectMapper(); 

@Autowired 
@Qualifier(value = "MainTemplate") 
private RabbitTemplate rabbitTemplate; 

@RabbitListener(
     id = "book", 
     bindings = @QueueBinding(
       value = @Queue(value = "sample.queue", durable = "true"), 
       exchange = @Exchange(value = "sample.exchange", durable = "true", delayed = "true"), 
       key = "sample.queue" 
     ) 
) 
public void handle(Message messageObject) { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    try { 
     message = convertMessageBodyToTransaction(messageObject.getBody()); 
     //After manual JSON conversion it works fine. 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public Message convertMessageBodyToTransaction(byte[] messageBody) throws BadRequestException { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    String body = null; 
    try { 
     body = new String(messageBody, "UTF-8"); 
     logger.debug("Message body converted successfully to string: {}", body); 
    } catch (UnsupportedEncodingException e) { 
     throw new BadRequestException(e); 
    } 
    try { 
     message = objectMapper.readValue(body, Message.class); 
     logger.debug("Message body mapped successfully to Message object: {}", message.toString()); 
    } catch (Exception e){ 
     logger.error("Message conversion failed for the following message body: {}", body); 
     throw new BadRequestException(e); 
    } 
    return message; 
} 

}

手動変換を避けるためにどのような方法がありますか?任意のヒントが

+0

完全なスタックトレースを表示する必要があります。奇妙な書式も修正する必要があります。 –

+0

@GaryRussell完全なスタックトレースを更新してビットをフォーマットしました。あなたの助けは相当なものでなければなりません。 – VelNaga

+0

@GaryRussellもうコードを投稿しますか? – VelNaga

答えて

0

この問題を解決することは有用であろうログが示した理由は、したがって、この消費者は、あなたが期待するように見えるよう、BookConsumer BookFailConsumer.execute

でNullpointerExceptionが発生メッセージを受信して​​いなかったです。 BookFailConsumerが呼び出された理由とそこで何が起こっていないのかを確認する必要があります。 (あなたはBookFailConsumerのコードを投稿していません)

+0

ご意見ありがとうございます。実際には、コードを投稿するときにタイプミスがあります。私は「BookFailConsumer」という消費者が1つしかありません。私は消費者の中に "org.springframework.amqp.core.Message"を得ることができることを私が述べた "BookConsumer"と呼ばれる消費者はいません。 – VelNaga

関連する問題