2017-07-26 31 views
1

私はSpringアプリケーションにRabbit MQブローカを統合しようとしています。私は正常にメッセージを消費することができますが、エラー処理を追加する必要があります。 リスナーはメッセージを消費し、DB書き込みを含むビジネス・ロジックを適用します。ビジネスロジックが例外をスローする可能性があります。私はSpring RabbitMQでのエラー処理

  1. ロールバックDbは書き込みに必要なこれらの例外の場合

  2. Dbのエラーテーブルに書き込み、msgの失敗を示します。
  3. メッセージを再キューに入れるべきではありません。​​3210でtxManagerを追加し、@Transactional

  4. 要件#2でListner.listen()方法を注釈を付けている - - エラーハンドラとDefaultExceptionStrategey

  5. のカスタム実装を追加した要件#1

    • については

  • 要件#3 - 設定済みDefaultRequeueRejected=false

  • しかしBusinessRuntimeExceptionはのErrorHandlerが呼び出さ取得していない、リスナーからスローされます。 不足しているものがわからない。 errorHandlerは一部の例外に対してのみ呼び出されますか?

    のConfig.xml

    <tx:annotation-driven transaction-manager="txManager" /> 
    <bean id="txManager" 
    class="org.springframework.transaction.jta.JtaTransactionManager"> 
    <property name="allowCustomIsolationLevels" value="true" /> 
    

    <rabbit:connection-factory id="rabbitConnectionFactory"/> 
    <rabbit:template id="rabbitTemplate" connection- 
    factory="rabbitConnectionFactory" message-converter="jsonMessageConverter" 
    channel-transacted="true"/> 
    <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/> 
    

    RabbitMQConfiguration.java

    @Configuration 
    @EnableRabbit 
    public class RabbitMqConfiguration { 
    
    @Autowired 
    private ConnectionFactory rabbitConnectionFactory; 
    
    @Autowired 
    private MessageConverter jsonMessageConverter; 
    
    @Bean 
    public SimpleRabbitListenerContainerFactory exportPartyListenerContainer() { 
        SimpleRabbitListenerContainerFactory listenerContainer = new SimpleRabbitListenerContainerFactory(); 
        listenerContainer.setConnectionFactory(rabbitConnectionFactory); 
        listenerContainer.setMessageConverter(jsonMessageConverter); 
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); 
        listenerContainer.setChannelTransacted(true); 
        listenerContainer.setDefaultRequeueRejected(false); 
        listenerContainer.setErrorHandler(errorHandler()); 
        return listenerContainer; 
    } 
    
    @Bean 
    public ErrorHandler errorHandler() { 
        return new ConditionalRejectingErrorHandler(new ExceptionStrategy()); 
    } } 
    

    ExceptionStrategy.java

    public class ExceptionStrategy extends DefaultExceptionStrategy { 
    
    @Autowired 
    private Dao daoBean; 
    
    @Override 
    public boolean isFatal(Throwable t) { 
    
        if (t instanceof BusinessRuntimeException) { 
         BusinessRuntimeException businessException = (BusinessRuntimeException) t; 
         //db call 
         daoBean.updateRecordStaus(); 
         return true; 
        } 
    
        if (t instanceof ListenerExecutionFailedException) { 
         ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
         logger.error(
           "Failed to process inbound message from queue " + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
             + "; failed message: " + lefe.getFailedMessage(), 
           t); 
        } 
        return super.isFatal(t); 
    }} 
    

    答えて

    0

    あなたのBusinessRuntimeExceptionをRuntimeExceptionにラップします。

    catch(BusinessRuntimeException e) 
    { 
        throw new RuntimeException(e); 
    } 
    
    関連する問題