2016-10-28 10 views
1

Atomikosを介したJTAサポートとSpringインバウンドとアウトバウンドの異なるWebshpere MQにバインドされたJMSを使用しています。 フローは以下である:Spring IntegrationメッセージがerrorChannelに移動したときのJTAロールバック

  • JMSインバウンド・チャネル・アダプタが出力キューエラーが発生
  • JMSアウトバウンド・チャネル・アダプタをメッセージを受信
  • いくつかの変換、errorChannelは、メッセージを受信
  • 例外タイプルータは、未処理エラーをカスタム再利用サービスにルーティングし、2つのエラーキューに送信する受信者リストルータに処理します。

私の問題は、メッセージがerrorChannel(処理された例外ケースとエラーキュー)の下流にある場合でも、トランザクションをコミットしたいことです。 私が理解しているように、例外が再処理された場合にのみロールバックが発生する必要があります(その理由は未処理のものを再処理します)が、私の場合は、メッセージがerrorChannel(

私は間違っていますか?

設定は次のとおりです。

<jms:inbound-channel-adapter id="jms-in" 
          destination="input-queue" 
          connection-factory="inConnectionFactory" 
          channel="edi-inbound" 
          acknowledge="transacted"> 
    <poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}" 
      fixed-rate="${process.jms.inbound.poll.rate-millis:60000}" 
      > 
     <transactional timeout="${process.tx.timeout-sec:60}"/> 
    </poller> 
</jms:inbound-channel-adapter> 
<channel id="edi-inbound"/> 

<chain input-channel="edi-inbound" output-channel="edi-transformation-chain"> 
    <object-to-string-transformer/> 
    <service-activator ref="inbound" method="service"/> 
</chain> 

<!-- edifact transformation flow --> 
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose"> 
    <transformer ref="edi2xml-converter"/> 
    <transformer ref="xml-mapper"/> 
</chain> 




<chain input-channel="outbound-message-compose" output-channel="outbound-channel"> 
    <service-activator ref="outbound-message-composer" /> 
</chain> 

<channel id="outbound-channel"> 
    <interceptors> 
     <beans:ref bean="outbound-interceptor" /> 
    </interceptors> 
</channel> 

<recipient-list-router input-channel="outbound-channel"> 
    <recipient channel="file-outbound"/> 
    <recipient channel="queue-outbound"/> 
</recipient-list-router> 



<channel id="queue-outbound"/> 
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/> 



<channel id="file-outbound"/> 
<file:outbound-channel-adapter id="file-outbound" 
            directory="${output.directory}" 
            filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'" 
            delete-source-files="true" /> 




<!-- notification outbound flow --> 
<channel id="errorChannel"> 
    <interceptors> 
     <wire-tap channel="logger"/> 
    </interceptors> 
</channel> 
<logging-channel-adapter id="logger" level="INFO"/> 

<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel"> 
    <mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" /> 
</exception-type-router> 

<recipient-list-router input-channel="error-notification-channel"> 
    <recipient channel="queue-outbound-error"/> 
    <recipient channel="queue-inbound-error"/> 
</recipient-list-router> 

<chain input-channel="queue-outbound-error"> 
    <service-activator ref="outbound-error-composer" /> 
    <jms:outbound-channel-adapter id="jms-out-error" 
            destination="error-output-queue" 
            connection-factory="outConnectionFactory" 
            session-transacted="true"/> 
</chain> 

<chain input-channel="queue-inbound-error"> 
    <service-activator ref="error-notif-composer" /> 
    <jms:outbound-channel-adapter id="jms-in-error" 
            destination="error-input-queue" 
            connection-factory="outConnectionFactory" 
            session-transacted="true"/> 
</chain> 


<channel id="unhandled-error-channel" /> 
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/> 

エラーチャネルのTXのロールバック、両方のエラー・キューは、どのような場合にメッセージ(アウトバウンドアダプタがトランザクションに参加しないかのように)、および通常のためのTXを受信したときだけ、完全なものにしますフロー(エラーが発生していない場合)は完全に機能します。

答えて

0

これは間違いありません。

Polling Inbound Channel Adapterを使用しているためです。それはロジックが似ているのです:あなたTXはpollingTaskプロキシの一部である

AbstractPollingEndpoint.this.taskExecutor.execute(() -> { 
    ... 
        if (!Poller.this.pollingTask.call()) { 
         break; 
        } 
    ... 
       catch (Exception e) { 
        if (e instanceof RuntimeException) { 
         throw (RuntimeException) e; 
        } 
        else { 
         throw new MessageHandlingException(new ErrorMessage(e), e); 
        } 
       } 
      } 
     }); 

、AOP TransactionInterceptorとして。

errorChannelは、this.taskExecutorErrorHandlerの一部です。 pollingTaskから例外が発生した場合にのみerrorChannelに届きます。私たちはそこにテキサス州を持っているので、それは当然反論される。

私のポイントは:Polling Inbound Channel Adapterのエラー処理プロセスはTXの外で行われます。

<int-jms:message-driven-channel-adapter>に切り替えることを検討してください。

+0

私は、プロダクション環境を中心に、ある時点でメッセージドリブンチャネルアダプタに切り替える予定でしたが、インバウンドチャネルアダプタとポーラーで同じ結果が得られることを期待しました。ポーラー版であっても、TX内にエラー処理プロセスを導入する方法はないと思いますか? – nex

+0

実際、JMSはストリーミングプロトコルであるため、常に宛先を聴くためには本当にオーガニックになります。 JMSのポーリングアダプタが必要な場合はまれなケースです –

+0

代わりに、メッセージを消費するアダプタを「ゆっくり」設定することもできます。つまり、消費の割合を決めることはできますか? – nex

関連する問題