2016-10-07 65 views
0

私は次のように私が達成しようとしているトポロジがあり、埋め込まれたブローカーのネットワークを作成しようとしています:ネットワーク接続

enter image description here

だから私はしたいですすべてのメッセージの最初の受信者となるブローカ1を実行し、トピック%X%を保持します。次に、Broker 2とBroker 3をBroker 1に接続し、ネットワーク接続を介してBroker 1を待機させたいとします。 は最終的に私は消費者がブローカー2とブローカーへの接続%のX%のトピックからメッセージを受信できるようにしたい、私は次のコードを書いてきたこれまでのところ、3

ブローカー1:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://localhost:61616"); 
broker.addNetworkConnector("static:(tcp://localhost:61616)"); 
broker.start(); 

ブローカー2:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://localhost:61617"); 
broker.addNetworkConnector("static:(tcp://localhost:61616)"); 
broker.start(); 

ブローカー3:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://localhost:61618"); 
broker.addNetworkConnector("static:(tcp://localhost:61616)"); 
broker.start(); 

プロデューサー:

public class Producer { 

    private Connection connection; 

    public Producer() throws JMSException { 
     // Create a ConnectionFactory 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://localhost:61616"); 

     connection = connectionFactory.createConnection(); 
     connection.start(); 
     .... 

    } 

    public void produceMessage(int x) { 
     try { 
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Destination destination = session.createTopic("Testtopic"); 
      MessageProducer producer = session.createProducer(destination); 
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
      String text = "Hello world " + x + "! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); 
      TextMessage message = session.createTextMessage(text); 
      System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); 

      producer.send(message); 
      session.close(); 
     } 
     ...... 
    } 
} 

消費者:

public class Consumer { 
    public Consumer() throws JMSException { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic")); 
     consumer.setMessageListener(new HelloMessageListener()); 

    } 

    private static class HelloMessageListener implements MessageListener { 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       System.out.println("Consumer " + Thread.currentThread().getName() + " received message: " + textMessage.getText()); 
       ....... 
     } 

    } 
} 

しかし、私はTCPに接続するとき:// localhostを:消費者が61618ブローカー3です)、私は何のメッセージも受け取ることができません。その間に直接 tcp:// localhost:61616(最初の受信者ブローカー1)に接続すると、コンシューマーはメッセージを受け取り、すべてうまく行きます。私はコネクター構成で何かを逃したと思う。これで私を助けてくれますか?

おかげで、

乾杯

+0

networkBridgesがアクティブになります? ActiveMQからのログメッセージはありますか? –

+0

@MattPavlovichスレッド「main」でExceptionを送信します。javax.jms.JMSException:ワイヤ形式のネゴシエーションのタイムアウト:ピアがワイヤ形式を送信しませんでした。 –

答えて

0

networkConnectorsは、URIに間違ったポートを持つように見えます。

は次のようになります。 .addNetworkConnector(TCP:// localhostを:61617) .addNetworkConnector broker1で

(TCP:// localhostを:61618)あなたはネットワークコネクタを必要としない

2から、3バック1

私も完全なオブジェクトを設定し、設定を強化するためにいくつかのパラメータを追加することをお勧めする...二重など、networkTTL = 1、=「false」を

+0

ありがとう、Matt!次のメッセージが表示されます。 スレッド "main"の例外javax.jms.JMSException:ワイヤ形式のネゴシエーションのタイムアウト:ピアがワイヤ形式を送信しませんでした。 このスキームを動作させるために、私はBroker2側にいくつかのコードを追加すべきだと思います。私は、Broker2とBroker2の間にネットワークチャネルが存在すると明示的に述べるコードを意味します。問題を調査する –

関連する問題