私はSpringアプリケーションにRabbit MQブローカを統合しようとしています。私は正常にメッセージを消費することができますが、エラー処理を追加する必要があります。 リスナーはメッセージを消費し、DB書き込みを含むビジネス・ロジックを適用します。ビジネスロジックが例外をスローする可能性があります。私はSpring RabbitMQでのエラー処理
- ロールバックDbは書き込みに必要なこれらの例外の場合
。
- Dbのエラーテーブルに書き込み、msgの失敗を示します。
- メッセージを再キューに入れるべきではありません。3210で
txManager
を追加し、@Transactional
要件#2で
Listner.listen()
方法を注釈を付けている - - エラーハンドラとDefaultExceptionStrategey
のカスタム実装を追加した要件#1
については
要件#3 - 設定済みDefaultRequeueRejected=false
しかしBusinessRuntimeException
はのErrorHandlerが呼び出さ取得していない、リスナーからスローされます。 不足しているものがわからない。 errorHandler
は一部の例外に対してのみ呼び出されますか?
のConfig.xml
<tx:annotation-driven transaction-manager="txManager" />
<bean id="txManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="allowCustomIsolationLevels" value="true" />
<rabbit:connection-factory id="rabbitConnectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-
factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
channel-transacted="true"/>
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
RabbitMQConfiguration.java
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private MessageConverter jsonMessageConverter;
@Bean
public SimpleRabbitListenerContainerFactory exportPartyListenerContainer() {
SimpleRabbitListenerContainerFactory listenerContainer = new SimpleRabbitListenerContainerFactory();
listenerContainer.setConnectionFactory(rabbitConnectionFactory);
listenerContainer.setMessageConverter(jsonMessageConverter);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
listenerContainer.setChannelTransacted(true);
listenerContainer.setDefaultRequeueRejected(false);
listenerContainer.setErrorHandler(errorHandler());
return listenerContainer;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new ExceptionStrategy());
} }
ExceptionStrategy.java
public class ExceptionStrategy extends DefaultExceptionStrategy {
@Autowired
private Dao daoBean;
@Override
public boolean isFatal(Throwable t) {
if (t instanceof BusinessRuntimeException) {
BusinessRuntimeException businessException = (BusinessRuntimeException) t;
//db call
daoBean.updateRecordStaus();
return true;
}
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error(
"Failed to process inbound message from queue " + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(),
t);
}
return super.isFatal(t);
}}