2017-02-26 9 views
0

以下のコードを使用して、複数のコンシューマがメッセージを消費するための複数のJMSセッションを作成しています。私の問題は、コードが単一スレッドの方法で実行されていることです。 Queueにメッセージが存在しても、2番目のスレッドは何も受信できず、ポーリングを保持します。第1のスレッドは、第1のバッチの処理を終了し、戻って残りのメッセージを消費する。ここでの使用方法に問題はありますか?マルチスレッドJMSクライアントActiveMQ

static { 
    try { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616"); 
     connection = connectionFactory.createConnection(); 
     connection.start(); 
    } catch (JMSException e) { 
     LOGGER.error("Unable to initialise JMS Queue.", e); 
    } 

} 

public JMSClientReader(boolean isQueue, String name) throws QueueException { 

    init(isQueue,name); 
} 

@Override 
public void init(boolean isQueue, String name) throws QueueException 
{ 

    // Create a Connection 
    try { 
     // Create a Session 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     if (isQueue) { 
      destination = new ActiveMQQueue(name);// session.createQueue("queue"); 
     } else { 
      destination = new ActiveMQTopic(name);// session.createTopic("topic"); 
     } 
     consumer = session.createConsumer(destination); 
    } catch (JMSException e) { 
     LOGGER.error("Unable to initialise JMS Queue.", e); 
     throw new QueueException(e); 
    } 
} 

public String readQueue() throws QueueException { 

    // connection.setExceptionListener(this); 
    // Wait for a message 
    String text = null; 
    Message message; 
    try { 
     message = consumer.receive(1000); 
     if(message==null) 
      return "done"; 
     if (message instanceof TextMessage) { 
      TextMessage textMessage = (TextMessage) message; 
      text = textMessage.getText(); 
      LOGGER.info("Received: " + text); 
     } else { 
      throw new JMSException("Invalid message found"); 
     } 
    } catch (JMSException e) { 
     LOGGER.error("Unable to read message from Queue", e); 
     throw new QueueException(e); 
    } 


    LOGGER.info("Message read is " + text); 
    return text; 

} 
+0

あなたがメッセージを消費する複数の消費者にしたい場合は、代わりに、キューのトピックを使用しています。 http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html – Rjiuk

+0

私の場合は、待ち行列に入るリスナーの数を増やそうとしています。現在トピックを使用したくない。 – HariJustForFun

答えて

4

問題はprefetchPolicyです。

persistent queues (default value: 1000) 
non-persistent queues (default value: 1000) 
persistent topics (default value: 100) 
non-persistent topics (default value: Short.MAX_VALUE - 1) 

すべてのメッセージ

は、最初の接続、消費者に派遣された、別の1つが接続するとき、彼は、メッセージを受信して​​いないあなたは、あなたがより低い値にprefetchPolicyを設定する必要があるキューの同時消費者を持っている場合ので、この動作を変更しますデフォルトよりも。例えばactivemq.xmlでURI設定にこの jms.prefetchPolicy.queuePrefetch=1を追加したり、クライアントのURLでそれを設定し、このよう

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1"); 

大プリフェッチ値が高い メッセージ量と高いパフォーマンスのために推奨されています。ただし、メッセージの量が少なくても、各メッセージの処理に時間がかかります。 これにより、コンシューマは一度に1つのメッセージしか処理できません。 ただし、0のプリフェッチ制限を指定すると、 がコンシューマにプッシュされるのではなく、コンシューマ が一度に1つずつメッセージをポーリングするようになります。

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

を見て、

http://activemq.apache.org/destination-options.html

+0

それは働いた。優れた答えをありがとう。これで私はナッツになってしまった。 – HariJustForFun

関連する問題