2016-09-01 13 views
0

私はRabbitMQキューからメッセージを消費するためにSpring SimpleMessageListenerContainerを使用しています。すべて正常に動作しますが、無効なメッセージ(たとえば無効なjson)がキューに送信されると、リスナーはただ停止し、ワーカーをシャットダウンし、それ以上のメッセージは受け付けません。RabbitMQのSpring SimpleMessageListenerContainerが無効なメッセージでアボートしています

壊れたメッセージを破棄し、それ以降のメッセージを引き続き受信するように設定することはできますか。

私は私の設定は次のようになりますスプリントウサギ-1.6.1.RELEASE.jar

を使用しています:

@Bean 
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
               MessageListenerAdapter listenerAdapter, 
               MessageConverter messageConverter) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("my.queue"); 
    container.setMessageListener(listenerAdapter); 
    container.setMessageConverter(messageConverter); 
    return container; 
} 

@Bean 
public MessageConverter messageConverter() { 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean 
MessageListenerAdapter listenerAdapter(Worker worker) { 
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage"); 
    messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter()); 
    return messageListenerAdapter; 
} 

私のリスナーメソッドの宣言:

public void processMessage(Map<String, String> message) { 

'"routeId":"7"}'(壊れたjson)のようなメッセージを送信すると、例外が発生します。

2016-09-02 08:10:35.821 WARN 35841 --- [ container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. 

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) 
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] 
at  org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 12 common frames omitted 

2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29]  o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing 

org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 1 common frames omitted 
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) 
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] 
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 12 common frames omitted 

2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer 
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 

SimpleMessageListenerContainerで致命的な例外がスローされ、ここで:

catch (ListenerExecutionFailedException ex) { 
        // Continue to process, otherwise re-throw 
        if (ex.getCause() instanceof NoSuchMethodException) { 
         throw new FatalListenerExecutionException("Invalid listener", ex); 
        } 
       } 

だから、コンテナが存在しない方式で構成されている場合、シャットダウンするようになっているようです。しかし、メッセージが壊れている場合は、間違ったパラメータ型のメソッドを呼び出そうとしています。このメソッドもNoSuchMethodExceptionを引き起こします。これは、どのプロデューサーも壊れたメッセージで消費者を殺すことができることを意味します。

ありがとうございました!

+0

私はあなたが 'によって何を意味するかわからない。しかし壊れたメッセージの場合には、またNoSuchMethodException.'を引き起こし、間違ったパラメータの型とメソッドを呼び出ししようとしています。あなたは完全なスタックトレースを表示する必要がありますが、編集したものではなく、どのバージョンを使用していますか?通常、悪いJSONは特別に扱われる 'MessageConversionException'を引き起こし、そのようなメッセージは拒否されます。 –

+0

@GaryRussellご意見ありがとうございます。私は、完全なstacktraceとさらに多くの設定を追加しました。それは、jsonを解析しようとしているときに失敗しただけでなく、Stringを引数としてリスナーメソッド( 'processMessage')を呼び出そうとしているようです。 – johannesv

+0

私の回答を参照してください。 - [JIRA Issue](https://jira.spring.io/browse/AMQP-644)も開きました。 –

答えて

1

興味深い。私はあなたの問題を再現できました。メッセージに__TypeID__ヘッダー(変換ヒント)が含まれていない場合は、単に「不良」jsonを文字列として返します。

私は変換器にカスタムクラスマッパーを挿入することでそれを解決することができました。

送信側のシステムにタイプヘッダーを設定させることもできます。

次に、MessageConversionExceptionがあるため、メッセージは拒否されます。

package com.example; 

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

import org.springframework.amqp.core.MessageProperties; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 
import org.springframework.amqp.support.converter.ClassMapper; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.amqp.support.converter.SimpleMessageConverter; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 

@SpringBootApplication 
public class So39264965Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend("my.queue", new Foo()); 
     context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS); 

     // bad json 
     template.setMessageConverter(new SimpleMessageConverter()); 
     template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> { 
      m.getMessageProperties().setContentType("application/json"); 
      return m; 
     }); 


     Thread.sleep(60000); 
     context.close(); 
    } 

    @Bean 
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
      MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
     container.setConnectionFactory(connectionFactory); 
     container.setQueueNames("my.queue"); 
     container.setMessageListener(listenerAdapter); 
     container.setMessageConverter(messageConverter); 
     return container; 
    } 

    @Bean 
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setMessageConverter(messageConverter()); 
     return rabbitTemplate; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("my.queue"); 
    } 

    @Bean 
    public MessageConverter messageConverter() { 
     Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); 
     jackson2JsonMessageConverter.setClassMapper(new ClassMapper() { 

      @Override 
      public Class<?> toClass(MessageProperties properties) { 
       return Foo.class; 
      } 

      @Override 
      public void fromClass(Class<?> clazz, MessageProperties properties) { 

      } 

     }); 
     return jackson2JsonMessageConverter; 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Worker worker) { 
     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage"); 
     messageListenerAdapter.setMessageConverter(messageConverter()); 
     return messageListenerAdapter; 
    } 

    @Bean 
    public Worker worker() { 
     return new Worker(); 
    } 

    public static class Worker { 

     private final CountDownLatch latch = new CountDownLatch(1); 

     public void processMessage(Foo foo) { 
      System.out.println(foo); 
      this.latch.countDown(); 
     } 

    } 

    public static class Foo { 

     private String bar = "bar"; 

     public String getBar() { 
      return this.bar; 
     } 

     public void setBar(String bar) { 
      this.bar = bar; 
     } 

     @Override 
     public String toString() { 
      return "Foo [bar=" + this.bar + "]"; 
     } 

    } 

} 
+0

ありがとう、それは動作します:-) – johannesv

関連する問題