2016-04-13 6 views
1

私は、メッセージとポストを格納しているスロットルと呼ばれるspring xdコンポーネントを持っていて、それらをスロットルしてゆっくり送ります。私は3つのコンテナでxdを実行しています。 1つのコンテナが落ちます。私はメッセージがゆるいです。下は私の構成です。ポストグレイに格納しておらず、メモリやキャッシュを失っているようです。メッセージを緩まないように設定する

<?xml version="1.0" encoding="UTF-8"?> 
<beans:beans xmlns="http://www.springframework.org/schema/integration" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:beans="http://www.springframework.org/schema/beans" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int-groovy="http://www.springframework.org/schema/integration/groovy" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc" 
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:ehcache="http://www.springframework.org/schema/cache" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd 
     http://www.springframework.org/schema/integration 
     http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd 
     http://www.springframework.org/schema/integration/jdbc 
     http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd 
     http://www.springframework.org/schema/integration/groovy 
     http://www.springframework.org/schema/integration/groovy/spring-integration-groovy.xsd 
     http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.1.xsd"> 



    <ehcache:annotation-driven cache-manager="cacheManager" /> 

    <context:property-placeholder 
     location="classpath*:dms-security-${region}.properties, classpath*:dms-jms-${region}.properties, classpath*:dms-mongo-${region}.properties" /> 

    <context:component-scan base-package="com.testhrottle.api.dms.*" 
     use-default-filters="false"> 
     <context:include-filter type="annotation" 
      expression="org.springframework.context.annotation.Configuration" /> 
    </context:component-scan> 

    <channel id="input"> 
     <!-- <interceptors> 
      <ref bean="messageInterceptor" /> 
     </interceptors> --> 
    </channel> 

    <channel id="output" /> 

    <channel id="sms" /> 

    <channel id="eml" /> 

    <channel id="del" /> 

    <!-- <header-value-router input-channel="input" header-name="Channel-Type" > 
     <mapping value="PHN" channel="sms" /> 
     <mapping value="EML" channel="eml" /> 
     <mapping value="DEL" channel="del" /> 
    </header-value-router> --> 

    <router input-channel="input"> 
     <beans:bean name="messageRouter" class="com.testhrottle.xd.util.MessageRouter"> 
      <beans:property name="cache" ref="configCache" /> 
     </beans:bean> 
    </router> 

    <service-activator input-channel="del" ref="delete" method="delete"/> 

    <filter input-channel="eml" output-channel="output"> 
     <beans:bean class="com.testhrottle.xd.throttle.Throttle"> 
      <beans:constructor-arg value="${tpsThreshold}" /> 
      <beans:constructor-arg value="${spanSeconds}" /> 
      <beans:property name="cache" ref="configCache" /> 
      <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" /> 
      <beans:property name="errorPublishing" ref="errorPublishing" /> 
     </beans:bean> 
    </filter> 

    <filter input-channel="sms" output-channel="output"> 
     <beans:bean class="com.testhrottle.xd.throttle.SmsThrottle"> 
      <beans:constructor-arg value="${smsTpsThreshold}" /> 
      <beans:constructor-arg value="${smsSpanSeconds}" /> 
      <beans:property name="cache" ref="configCache" /> 
      <beans:property name="dailyLimitCounter" ref="dailyLimitCounter" /> 
      <beans:property name="errorPublishing" ref="errorPublishing" /> 
     </beans:bean> 
    </filter> 

    <channel id="requeue"> 
     <dispatcher task-executor="taskExecutor" /> 
    </channel> 

    <task:executor id="taskExecutor" pool-size="1" /> 

    <delayer id="delayer" input-channel="requeue" default-delay="${holdingDelay}" 
     expression="headers['delay']" message-store="messageStore" 
     output-channel="input" /> 


    <!-- <int-jdbc:message-store id="messageStore" data-source="dataSource" /> --> 

    <beans:bean id="messageStore" class="com.testhrottle.xd.util.CustomJdbcMessageStore"> 
     <beans:constructor-arg ref="dataSource"/> 
    </beans:bean> 

    <!-- <beans:bean name="messageInterceptor" class="com.testhrottle.xd.util.MessageInterceptor"> 
     <beans:property name="cache" ref="configCache" /> 
    </beans:bean> --> 

    <beans:bean name="configCache" 
     class="com.testhrottle.xd.cache.ConfigCache"> 
     <beans:property name="jdbcTemplate" ref="jdbcTemplate" /> 
     <beans:property name="holdRetryIntervelInSeconds" value="${holdRetryIntervelInSeconds}" /> 
     <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay,purge_ind purge from ${schema}.alrt_type where alrt_type_cd = ?" /> 
    </beans:bean> 

    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource"> 
     <beans:property name="driverClassName" value="org.postgresql.Driver"/> 
     <beans:property name="url" value="${dbURL}"/> 
     <beans:property name="username" value="${userName}"/> 
     <beans:property name="password" value="#{encryptedDatum.decryptBase64Encoded('${passWord}')}"/> 
    </beans:bean> 

    <beans:bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> 
     <beans:property name="dataSource"> 
      <beans:ref bean="dataSource" /> 
     </beans:property> 
    </beans:bean> 

    <beans:bean id="encryptedDatum" class="com.testhrottle.api.dms.core.security.EncryptedSecuredDatum"/> 

    <!-- <beans:bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"/> --> 

    <beans:bean id="errorPublishing" class="com.testhrottle.xd.util.ErrorPublishing"> 
     <!-- <beans:property name="rabbitTemplate" ref="rabbitTemplate" /> --> 
    </beans:bean> 

    <beans:bean id="delete" class="com.testhrottle.xd.util.Delete"/> 

    <!-- 
    <beans:bean id="dataSource" 
    class="org.springframework.jdbc.datasource.DriverManagerDataSource"> 
     <beans:property name="driverClassName" value="org.postgresql.Driver"/> 
     <beans:property name="url" value="jdbc:postgresql://localhost:5432/postgres"/> 
     <beans:property name="username" value="postgres"/> 
     <beans:property name="password" value="postgres"/> 
    </beans:bean> 
    <beans:bean name="configCache" 
     class="com.testhrottle.xd.cache.ConfigCache"> 
     <beans:property name="jdbcTemplate" ref="jdbcTemplate" /> 
     <beans:property name="holdRetryIntervelInSeconds" value="10" /> 
     <beans:property name="configQuery" value="select alrt_type_cd alertType,dlvry_days_of_week daysOfWeek,DLVRY_STRT_TM startTime,DLVRY_END_TM endTime,DLVRY_STRT_DT startDate,DLVRY_END_DT endDate,hold_ind holdFlag, max_msgs_per_day maxMsgPerDay from dms_wb.alrt_type where alrt_type_cd = ?; 
     " /> 
    </beans:bean> 

    <beans:bean name="cacheTest" 
     class="com.testhrottle.xd.cache.CacheTest"> 
     <beans:property name="cache" ref="configCache" /> 
    </beans:bean> --> 

    <beans:bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/> 


    <beans:bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"> 
     <beans:property name="cacheManager" ref="ehcache" /> 
    </beans:bean> 

    <beans:bean id="ehcache" 
     class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"> 
     <beans:property name="configLocation" value="classpath:ehcache.xml" /> 
     <beans:property name="shared" value="true" /> 
    </beans:bean> 

    <beans:bean name="dailyLimitCounter" class="com.testhrottle.xd.throttle.DailyLimitCounter"/> 

    <task:scheduler id="taskSchedulerInbound" pool-size="10" /> 

    <task:scheduled-tasks> 
     <task:scheduled ref="dailyLimitCounter" method="clearAlertDailyCount" cron="59 59 23 * * *" /> 
    </task:scheduled-tasks> 

</beans:beans> 

これを削除すると、同じスレッドで動作するはずです。

<channel id="requeue"> 
    <dispatcher task-executor="taskExecutor" /> 
</channel> 

<task:executor id="taskExecutor" pool-size="1" /> 

答えて

2

あなたの<dispatcher task-executor="taskExecutor" />は有罪と思われます。

メッセージバスのスレッドから飛び出したので、そのメッセージはブローカでメッセージをコミット(コミット)しますが、ノードのクラッシュはそれにはまだ影響しません(rallback)。メッセージは別の糸。そして、あなたのDBにも届いていません。

一般に、コンテナの(XD)スレッドを使用してSpring XDのすべてを実行し、メッセージバスの永続メカニズムに依存する方がよいでしょう。 または、DBに手動でメッセージをコミットしますが、同じXDスレッドにコミットします。

意味がありますか?

+0

意味が分かりますが、更新された質問を確認してください。 – constantlearner

+0

メッセージバススレッドで、したがって、TXまたはその他のack/nack境界で。 –

関連する問題