2017-11-21 5 views
0

私はクライアントにcontent_type application/jsonを提供する必要はありませんが、デフォルトにしています。私はこれを働かせた。デフォルトのcontent_type application/jsonにDefaultExceptionStatgyから無効化されたisFatalがあります

また、別の例と組み合わせて、ConditionalRejectingErrorHandlerからカスタムisFatal(Throwable t)を実装しようとしました。カスタムエラーハンドラを起動させることはできますが、それでもcontent_typeプロパティが必要になるようです。私はそれらを同時に両方の方法で動かす方法を理解できません。

アイデア?

以下は正常に動作しますcontent_type 編集:以下のコードは私が思ったように機能しません。プロパティCONTENT_TYPEアプリケーション/ JSONと、キュー内の古いメッセージがConditionalRejectingErrorHandlerでisFatalを()をオーバーライドするには、以下のここ

@EnableRabbit 
@Configuration 
public class ExampleRabbitConfigurer implements 
RabbitListenerConfigurer { 

@Value("${spring.rabbitmq.host:'localhost'}") 
private String host; 

@Value("${spring.rabbitmq.port:5672}") 
private int port; 

@Value("${spring.rabbitmq.username}") 
private String username; 

@Value("${spring.rabbitmq.password}") 
private String password; 

@Autowired 
private MappingJackson2MessageConverter mappingJackson2MessageConverter; 

@Autowired 
private DefaultMessageHandlerMethodFactory messageHandlerMethodFactory; 

@Bean 
public MappingJackson2MessageConverter mappingJackson2MessageConverter() { 
    return new MappingJackson2MessageConverter(); 
} 

@Bean 
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() { 
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
    factory.setMessageConverter(mappingJackson2MessageConverter); 
    return factory; 
} 

@Override 
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) { 
    registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory); 
} 

作品に引っ張られている必要があります。 SimpleRabbitListenerContainerFactory.setMessageConverter()は、DefaultMessageHandlerMethodFactory.setMessageConverter()と同じ目的を果たすようです。明らかに、これは当てはまりません。

@EnableRabbit 
@Configuration 
public class ExampleRabbitConfigurer { 

@Value("${spring.rabbitmq.host:'localhost'}") 
private String host; 

@Value("${spring.rabbitmq.port:5672}") 
private int port; 

@Value("${spring.rabbitmq.username}") 
private String username; 

@Value("${spring.rabbitmq.password}") 
private String password; 

@Autowired 
ConnectionFactory connectionFactory; 

@Autowired 
Jackson2JsonMessageConverter jackson2JsonConverter; 

@Autowired 
ErrorHandler amqpErrorHandlingExceptionStrategy; 

@Bean 
public Jackson2JsonMessageConverter jackson2JsonConverter() { 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean 
public ErrorHandler amqpErrorHandlingExceptionStrategy() { 
    return new ConditionalRejectingErrorHandler(new AmqpErrorHandlingExceptionStrategy()); 
} 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setMessageConverter(jackson2JsonConverter); 
    factory.setErrorHandler(amqpErrorHandlingExceptionStrategy); 
    return factory; 
} 


public static class AmqpErrorHandlingExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { 

    private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(getClass()); 

    @Override 
    public boolean isFatal(Throwable t) { 

     if (t instanceof ListenerExecutionFailedException) { 
      ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
      LOGGER.error("Failed to process inbound message from queue " 
        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
        + "; failed message: " + lefe.getFailedMessage(), t); 
     } 
     return super.isFatal(t); 
    } 

} 

答えて

0

インバウンドメッセージにcontentTypeヘッダを追加する「の後に受信」MessagePostProcessorを使用します。

バージョン2.0以降、MPPをコンテナファクトリに追加できます。あなたが見ることができるようにあなたが...

@SpringBootApplication 
public class So47424449Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So47424449Application.class, args); 
    } 

    @Bean 
    public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, RabbitTemplate template) { 
     return args -> { 
      SimpleMessageListenerContainer container = 
        (SimpleMessageListenerContainer) registry.getListenerContainer("myListener"); 
      container.setAfterReceivePostProcessors(m -> { 
       m.getMessageProperties().setContentType("application/json"); 
       return m; 
      }); 
      container.start(); 

      // send a message with no content type 
      template.setMessageConverter(new SimpleMessageConverter()); 
      template.convertAndSend("foo", "{\"bar\":\"baz\"}", m -> { 
       m.getMessageProperties().setContentType(null); 
       return m; 
      }); 
      template.convertAndSend("foo", "{\"bar\":\"qux\"}", m -> { 
       m.getMessageProperties().setContentType(null); 
       return m; 
      }); 
     }; 
    } 

    @Bean 
    public Jackson2JsonMessageConverter converter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @RabbitListener(id = "myListener", queues = "foo", autoStartup = "false") 
    public void listen(Foo foo) { 
     System.out.println(foo); 
     if (foo.bar.equals("qux")) { 
      throw new MessageConversionException("test"); 
     } 
    } 

    public static class Foo { 

     public String bar; 

     public String getBar() { 
      return this.bar; 
     } 

     public void setBar(String bar) { 
      this.bar = bar; 
     } 

     @Override 
     public String toString() { 
      return "Foo [bar=" + this.bar + "]"; 
     } 

    } 

} 

を再構成することができ、以前のバージョンの

、それはソースメッセージを修正するため、変更されたヘッダーは、エラーハンドラで提供され...

2017-11-22 09:39:26.615 WARN 97368 --- [cTaskExecutor-1] ingErrorHandler $ DefaultExceptionStatgy:致命的なメッセージ変換エラーです。メッセージは拒否されました。 (Body: '{"bar": "qux"}' MessageProperties [headers = {}、contentType = application/json、contentEncoding = UTF-8のようにデッド・レター・エクスチェンジに送られます。 、contentLength = 0、receivedDeliveryMode = PERSISTENT、priority = 0、redelivered = false、receivedExchange =、receivedRoutingKey = foo、deliveryTag = 2、consumerTag = amq.ctag-re1kcxKV14L_nl186stM0w、consumerQueue = foo])、contentType = application/json、contentEncoding =再配信UTF-8、ん。ContentLength = 0、receivedDeliveryMode = PERSISTENT、優先度= 0、=偽、receivedExchange =、receivedRoutingKey = FOO、deliveryTag = 2、consumerTag = amq.ctag-re1kcxKV14L_nl186stM0w、consumerQueue = FOO])

+0

と2.0では、MPPをコンテナファクトリに追加できます。 –

+0

おかげでゲーリー、afterReceivePostProcessorは私が必要としていたものでした。あなたのサンプルを自分のコードにうまく適応させることができました。 – JvmSd121

関連する問題