2016-08-31 7 views
0

私のアプリケーションをJMSキュー(ActiveMQを使用)と統合しようとしています。 私はSpring Integrationを統合コンポーネントとして使用しました。 接続プーリングが必要です。 'DefaultMessageListenerContainer'に 'maxConcurrentConsumers'を100として指定しました。Spring Integration | JMS(ActiveMq)を使用した接続プーリング

問題は、すべてのメッセージがキューから読み取られると、「Number of Consumers」はActiveMqコンソールに表示されるように100のままです。 データベース内で(JNDI経由で)接続プールを使用すると、接続が不要になればプールに戻され、ここでは発生していないオープン接続数が減少します。

これを処理するための指針は非常に役立ちます。

私のコードは以下の通りです:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:aop="http://www.springframework.org/schema/aop" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
xmlns:jms="http://www.springframework.org/schema/jms" 
xmlns:int="http://www.springframework.org/schema/integration" 
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd 
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd 
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> 


<!-- Component scan to find all Spring components --> 
<context:component-scan base-package="com.poc.springinteg._7" /> 

<!-- --> 
<bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false"> 
    <property name="environment"> 
     <props> 
      <prop key="java.naming.provider.url">tcp://localhost:61616</prop> 
      <prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop> 
      <prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop> 
      <prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop> 
      <prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop> 
      <prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop> 
     </props> 
    </property> 
</bean> 

<bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false"> 
    <property name="jndiTemplate" ref="remoteJndiTemplate"/> 
    <property name="jndiName" value="QueueConnectionFactory"/> 
    <property name="lookupOnStartup" value="true" /> 
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" /> 
</bean> 

    <!-- writing queue --> 
    <bean id="destinationqueue"   
class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg index="0"> 
     <value>OutputQueue_7</value> 
     </constructor-arg> 
    </bean> 

<int:channel id="outbound"/> 

<int-jms:outbound-channel-adapter id="jmsOut" 
           channel="outbound" 
           connection-factory="remoteConnectionFactory" 
           destination="destinationqueue" /> 

    <!-- reading queue --> 
    <bean id="sourceQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg index="0"> 
      <value>OutputQueue_7</value> 
     </constructor-arg> 
    </bean> 

    <bean id="messageListenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="remoteConnectionFactory"/> 
    <property name="destination" ref="sourceQueue"/> 
    <property name="maxConcurrentConsumers" value="10"/> 
    <property name="concurrentConsumers" value="1"/> 
    <property name="autoStartup" value="true"/>   
</bean> 

<int:channel id="inbound"/> 

<int-jms:message-driven-channel-adapter id="jmsIn" 
             channel="inbound" 
             extract-payload="false" 
             container="messageListenerContainer" /> 


    <int:service-activator input-channel="inbound" 
         output-channel="outbound" 
         ref="messageReader" 
         method="onMessage" /> 

</beans> 


-- Message Reader Class 

import javax.jms.JMSException; 

import org.springframework.integration.annotation.ServiceActivator; 
import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 

@Component("messageReader") 
public class MessageReader 
{ 

@ServiceActivator 
public void onMessage(Message inboundMessage) { 

    System.out.println(" -------Message Read Start--------"); 

    System.out.println(inboundMessage.getHeaders()); 

    System.out.println(" -------Message Headers Reading completed--------"); 

    System.out.println("payload-->" + inboundMessage.getPayload().getClass()); 
    String payload = inboundMessage.getPayload().toString(); 
    System.out.println("payload value-->" + payload); 


    org.apache.activemq.command.ActiveMQTextMessage obj = (org.apache.activemq.command.ActiveMQTextMessage)inboundMessage.getPayload(); 
    System.out.println("Object-->" + obj); 

    String var = null; 
    try { 
     var = obj.getText(); 
     System.out.println("Datastructure-->" + obj.getText()); 
    } catch (JMSException e) { 

     e.printStackTrace(); 
    } 

} 

} 


---- Message Writer Class 

@Component("sendMessage") 
public class SendMessage { 

@Autowired 
private MessageChannel outbound; 


public void send(String name) 
{ 
    Entity entity = new Entity(1,"anuj"); 

    Message<Entity> message = MessageBuilder.withPayload(entity) 
           .setHeader("Message_Header1", "Message_Header1_Value") 
           .setHeader("Message_Header2", "Message_Header2_Value") 
           .build(); 

    outbound.send(message); 
} 

} 


-- Application main class 
public class App { 

public static void main(String[] args) 
{ 
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:7_applicationContext.xml"); 

    SendMessage sendMessage = (SendMessage)applicationContext.getBean("sendMessage", SendMessage.class); 

    for(int i=0;i<10;i++){ 
     sendMessage.send("This is Message Content"); 
    } 

    applicationContext.registerShutdownHook(); 
} 

} 

答えて

0

あなたはCachingConnectionFactoryで接続ファクトリをラップした場合、すべての消費者によって共有される1つの接続が存在します。

コンシューマーは、需要に基づいてconcurrentConsumersとの間のコンテナによって調整されます。活動の爆発後、消費者が減少するまでには時間がかかるでしょう。

0

ゲーリー、あなたは以下のような意味ですか?以下のコードも接続を解放していません:

<bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false"> 
    <property name="environment"> 
     <props> 
      <prop key="java.naming.provider.url">tcp://localhost:61616</prop> 
      <prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop> 
      <prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop> 
      <prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop> 
      <prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop> 
      <prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop> 
     </props> 
    </property> 
</bean> 

<bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false"> 
    <property name="jndiTemplate" ref="remoteJndiTemplate"/> 
    <property name="jndiName" value="QueueConnectionFactory"/> 
    <property name="lookupOnStartup" value="true" /> 
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" /> 
</bean> 

<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="remoteConnectionFactory" /> 
    <property name="sessionCacheSize" value="10" /> 
</bean> 


    <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
    <property name="connectionFactory" ref="jmsConnectionFactory"/> 
    <property name="destination" ref="sourceQueue"/> 
    <property name="maxConcurrentConsumers" value="10"/> 
    <property name="concurrentConsumers" value="1"/> 
    <property name="autoStartup" value="true"/>   
</bean> 

<int:channel id="inbound"/> 

<int-jms:message-driven-channel-adapter id="jmsIn" 
             channel="inbound" 
             extract-payload="false" 
             container="messageListenerContainer" /> 
関連する問題