2017-04-10 11 views
0

ActiveMQトピックからのメッセージを消費するtomcatのクラスタがあります。今、クラスター内のTomcatの1つがダウンしたら、私は消費者の数が1になると推測しています。ActiveMQトピックの消費者の変化を検出する

これで、そのトピックのコールバックまたはリスナーを使用して変更を検出したいと思います。それは実現可能ですか?

Region.getDestinations(ActiveMQDestination)のようになりますか?

答えて

0

アドバイスメッセージは必要なものです。

このコードでメッセージを受け取るたびに、のコンシューマーが開始または停止していることを意味します。

ドキュメントhttp://activemq.apache.org/advisory-message.html

例:

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Queue; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.command.ConsumerInfo; 
import org.apache.activemq.command.RemoveInfo; 

public class AdvisorySupportConsumerAdvisoryTopic { 

    public static void main(String[] args) throws JMSException { 
     Connection conn = null; 
     try { 
      ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671"); 
      conn = cf.createConnection("admin", "admin"); 
      ActiveMQSession session = (ActiveMQSession) conn.createSession(false, 
        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
      conn.start(); 
      Queue q = session.createQueue("Q"); 
      Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(q); 
      MessageConsumer consumer = session.createConsumer(advisoryDestination); 
      consumer.setMessageListener(new MessageListener() { 
       @Override 
       public void onMessage(Message msg) { 
        if (msg instanceof ActiveMQMessage) { 
         try { 
          ActiveMQMessage aMsg = (ActiveMQMessage) msg; 
          System.out.println(aMsg.getStringProperty("consumerCount")); 
          System.out.println(aMsg.getStringProperty("producerCount")); 
          if (aMsg.getDataStructure() instanceof ConsumerInfo) { 
           // Consumer start 
           ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure(); 
           System.out.println(consumerInfo); 
          } else if (aMsg.getDataStructure() instanceof RemoveInfo) { 
           // Consumer stop 
           RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure(); 
           System.out.println(removeInfo); 
          } 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 
} 

あなたは、これはあなたが接続開始や停止を持っていることを意味し、このコードを使用してメッセージを得たたびに。

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.command.ConnectionInfo; 
import org.apache.activemq.command.RemoveInfo; 

public class AdvisorySupportConnectionAdvisoryTopic { 

    public static void main(String[] args) throws JMSException { 
     Connection conn = null; 
     try { 
      ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671"); 
      conn = cf.createConnection("admin", "admin"); 
      ActiveMQSession session = (ActiveMQSession) conn.createSession(false, 
        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
      conn.start(); 
      Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConnectionAdvisoryTopic(); 
      MessageConsumer consumer = session.createConsumer(advisoryDestination); 
      consumer.setMessageListener(new MessageListener() { 
       @Override 
       public void onMessage(Message msg) { 
        if (msg instanceof ActiveMQMessage) { 
         try { 
          ActiveMQMessage aMsg = (ActiveMQMessage) msg; 
          if (aMsg.getDataStructure() instanceof ConnectionInfo) { 
           // Connection start 
           ConnectionInfo connectionInfo = (ConnectionInfo) aMsg.getDataStructure(); 
           System.out.println(connectionInfo); 
          } else if (aMsg.getDataStructure() instanceof RemoveInfo) { 
           // Connection stop 
           RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure(); 
           System.out.println(removeInfo); 
          } 
         } catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 
} 
関連する問題