2017-01-07 9 views
0

私は、ボイラープレートのJMSコードを使用してActiveMQサーバーとの間でメッセージを送受信するJavaアプリケーションを使用しています。JMS/ActiveMQ MessageConsumer.recieve()が返されない

this.consumerFactory = new ActiveMQConnectionFactory(this.ingestItemBrokerUrl); 
this.consumerConnection = this.consumerFactory.createConnection(); 
this.consumerConnection.start(); 
this.consumerSession = this.consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
this.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName()); 
this.consumer = this.consumerSession.createConsumer(this.consumerDestination); 

私のアプリケーションは、(接続が閉じられるまで)は、ActiveMQのトピックに到着したメッセージを処理するループでMessageConsumer.receive()を呼び出すには:

:ここ

message = this.consumer.receive(); 

を謎です私はlocalhostで動作しているActiveMQサーバーに接続しますが、これは期待通りに機能します。しかし、Azureクラウドマシン(BitnamiのActiveMQスタックでロードされている)で稼動しているActiveMQサーバーに接続すると、コールは無期限にブロックされますが、AMQ管理コンソールからクライアントがメッセージの接続とデキューを行っていることがわかります。

なぜ、ローカルサーバーからリモートサーバーに切り替えるときに違う動作が見られるのですか?どうすればトラブルシューティングをさらに進めることができますか?

マイクラウドactivemq.xml設定ファイルは以下の通りです:

<beans xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor"> 
<property name="algorithm" value="PBEWithMD5AndDES"/> 
<property name="password" value="**REDACTED**"/> 
</bean> 

<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer"> 
    <constructor-arg ref="configurationEncryptor"/> 
    <property name="location" value="file:${activemq.conf}/credentials-enc.properties"/> 
</bean> 

<!-- Allows accessing the server log --> 
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> 
</bean> 

<!-- 
    The <broker> element is used to configure the ActiveMQ broker. 
--> 
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> 
    <plugins> 
     <!--simpleAuthenticationPlugin> 
      <users> 
       <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="admins"/> 
      </users> 
     </simpleAuthenticationPlugin--> 

     <!-- if not already set, set ttl to 1 minutes --> 
     <timeStampingBrokerPlugin zeroExpirationOverride="60000"/> 

    </plugins> 

    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
      <policyEntry topic="&gt;" producerFlowControl="true" memoryLimit="1mb"> 
       <pendingSubscriberPolicy> 
       <vmCursor/> 
       </pendingSubscriberPolicy> 
      </policyEntry> 
      <policyEntry queue="&gt;" producerFlowControl="true" memoryLimit="1mb"> 
       <!-- Use VM cursor for better latency 
        For more information, see: 

        http://activemq.apache.org/message-cursors.html 

       <pendingQueuePolicy> 
       <vmQueueCursor/> 
       </pendingQueuePolicy> 
       --> 
      </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 


    <!-- 
     The managementContext is used to configure how ActiveMQ is exposed in 
     JMX. By default, ActiveMQ uses the MBean server that is started by 
     the JVM. For more information, see: 

     http://activemq.apache.org/jmx.html 
    --> 
    <managementContext> 
     <managementContext createConnector="false"/> 
    </managementContext> 

    <!-- 
     Configure message persistence for the broker. The default persistence 
     mechanism is the KahaDB store (identified by the kahaDB tag). 
     For more information, see: 

     http://activemq.apache.org/persistence.html 
    --> 
    <persistenceAdapter> 
     <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 


     <!-- 
     The systemUsage controls the maximum amount of space the broker will 
     use before slowing down producers. For more information, see: 
     http://activemq.apache.org/producer-flow-control.html 
     If using ActiveMQ embedded - the following limits could safely be used: 

    <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage limit="20 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="1 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="100 mb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 
    --> 
     <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage limit="64 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="1 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="100 mb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

    <!-- 
     The transport connectors expose ActiveMQ over a given protocol to 
     clients and other brokers. For more information, see: 

     http://activemq.apache.org/configuring-transports.html 
    --> 
    <transportConnectors> 
     <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
     <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 


     <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;trace=true&amp;needClientAuth=true"/> 
    </transportConnectors> 

        <!-- SSL Configuration Context --> 
      <sslContext> 
       <sslContext keyStore="file:${activemq.conf}/amq-server.ks" 
          keyStorePassword="**REDACTED**" 
             trustStore="file:${activemq.conf}/amq-server.ts" 
        trustStorePassword="**REDACTED**" /> 
      </sslContext> 


</broker> 

<!-- 
    Enable web consoles, REST and Ajax APIs and demos 
    The web consoles requires by default login, you can disable this in the jetty.xml file 

    Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details 
--> 
<import resource="jetty.xml"/> 

</beans><!-- END SNIPPET: example --> 
+0

受信メソッドは、メッセージが正常に受信されるまで無期限にブロックされるように設計されています。消費者が消費していないときにトピックにメッセージを送信しましたか?クラウドの設定は異なりますか?あなたはAMQクラウドの設定を投稿できます –

+0

私は 'receive()'がメッセージが到着するまでブロックするはずだと理解します。 locahostを使用すると、メッセージを送信するまでブロックされます。クラウドAMQを使用すると、メッセージを送信した後でもブロックされます。 –

+0

クラウドAMQ –

答えて

0

あなたは宛先名としてthis.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName() + "?consumer.prefetchSize=1");

これを使用してみてくださいまたは

consumerFactory.getPrefetchPolicy().setTopicPrefetch(1); 

を呼び出すことができますメッセージリスナーで試してみてください

MessageListener listner = new MessageListener() { 
     public void onMessage(Message message) { 
      try { 
       if (message instanceof TextMessage) { 
        TextMessage textMessage = (TextMessage) message; 
        System.out.println("Received message" 
          + textMessage.getText() + "'"); 
       } 
      } catch (JMSException e) { 
       System.out.println("Caught:" + e); 
       e.printStackTrace(); 
      } 
     } 
    }; 

    consumer.setMessageListener(listner); 
+0

私は最初の提案を試みました。同じ結果:recieve()は返さない。 –

+0

2番目の提案はできません。私のconsumerFactoryには 'getPrefetchPolicy'メソッドはありません。 –

+0

あなたのクラウドバージョンは4.xですか? –