2016-10-18 9 views
0

私のトランザクションがロールバックとしてマークされたときにSimpleMessageListenerContainerがメッセージを継続的に消費しないようにしたいと思います。SimpleMessageListenerContainerはTransactionSystemExceptionの後も継続的に消費されています

設定ファイルにlistener-containerを追加せず、を再送信しようとしました。それは動作していません。 ここに私のコードベースである:

@Transactional 
public class MySecondService { 

    @Resource 
    private MySecondRepository mySecondRepository; 

    @Transactional(propagation = Propagation.REQUIRED) 
    @ServiceActivator(inputChannel = "my-first-servie-output-channel", 
         outputChannel = "my-Second-servie-output-channel") 
    public String saveEntity(final MyTestEntity myTestEntity) 
     throws AmqpRejectAndDontRequeueException { 
     try { 
      mySecondRepository.save(myTestEntity); 
     } catch (Exception e) { 
      throw new AmqpRejectAndDontRequeueException("exception"); 
     } 
    } 
} 

春-統合のcontext.xml

<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel"> 
    </int:chain> 
<int-amqp:inbound-channel-adapter 
      channel="transaction-inbound-channel" 
      queue-names="sample.queue" 
      concurrent-consumers="5" 
      error-channel="failed-channel" 
      connection-factory="rabbitConnectionFactory" 
      mapped-request-headers="*" 
      transaction-manager="transactionManager" /> 

    <rabbit:connection-factory 
      id="rabbitConnectionFactory" 
      connection-factory="rcf" 
      host="${spring.rabbitmq.host}" 
      port="${spring.rabbitmq.port}" 
      username="${spring.rabbitmq.username}" 
      password="${spring.rabbitmq.password}" 
      /> 
    <bean id="rcf" class="com.rabbitmq.client.ConnectionFactory"> 
     <property name="host" value="${spring.rabbitmq.host}"/> 
     <property name="requestedHeartbeat" value="10" /> 
    </bean> 

これは、スタックトレースです:私は、これはorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer

が起こるしたくない

WARN [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it 
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly 
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1062) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55] 
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly 
    at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74) ~[hibernate-entitymanager-4.3.11.Final.jar:4.3.11.Final] 
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    ... 7 common frames omitted 
2016-10-19 00:52:50,673 [SimpleAsyncTaskExecutor-1] INFO [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-QBWGIY7co5vUpXwGDXDDBg=sample.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:9999/,2), acknowledgeMode=AUTO local queue size=0 

もう一度私が何かを助けてAmqpRejectAndDontRequeueException。前もって感謝します。

答えて

0

フローを表示しません。

単一のサービスのみを呼び出す場合は、トランザクションマネージャをインバウンドチャネルアダプタに追加しないでください。

サービスが呼び出されるとトランザクションが開始されます。 failed-channelのエラーフローが例外をスローしない限り(またはAmqpRejectAndDontRequeueExceptionをスローした場合)、メッセージはキューに入れられません。

1つのトランザクションに複数のサービスを参加させる場合は、AOPアドバイスを使用してtransaction-inbound-channelをトランザクションにすることができ、ダウンストリームフローはすべてそのトランザクション内で実行されます。

または、場合メッセージが再キューイングしたい、(あなたが<bean/>としてアップ配線する必要があります)インバウンドアダプタのコンテナにfalsedefaultRequeueRejectedを設定することはありません。

+0

固定 - 'defaultRequeueRejected'はデフォルトでtrueです。常に拒否するために「偽」する必要があります。 –

+0

エラーフローから 'AmqpRejectAndDontRequeueException'を投げてみることができます。コミットを回避する必要があります。 –

+0

Garyに感謝します。これを私の 'error-channel'に追加しました。 (例外Exception e)throws AmqpRejectAndDontRequeueException { 新しいAmqpRejectAndDontRequeueException( "例外。")をスローします。 } '。 –

関連する問題