私は次のように私が達成しようとしているトポロジがあり、埋め込まれたブローカーのネットワークを作成しようとしています:ネットワーク接続
だから私はしたいですすべてのメッセージの最初の受信者となるブローカ1を実行し、トピック%X%を保持します。次に、Broker 2とBroker 3をBroker 1に接続し、ネットワーク接続を介してBroker 1を待機させたいとします。 は最終的に私は消費者がブローカー2とブローカーへの接続%のX%のトピックからメッセージを受信できるようにしたい、私は次のコードを書いてきたこれまでのところ、3
:
ブローカー1:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
を
ブローカー2:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61617");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
ブローカー3:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61618");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
プロデューサー:
public class Producer {
private Connection connection;
public Producer() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
....
}
public void produceMessage(int x) {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("Testtopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world " + x + "! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
session.close();
}
......
}
}
消費者:
public class Consumer {
public Consumer() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic"));
consumer.setMessageListener(new HelloMessageListener());
}
private static class HelloMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Consumer " + Thread.currentThread().getName() + " received message: " + textMessage.getText());
.......
}
}
}
しかし、私はTCPに接続するとき:// localhostを:消費者が61618(ブローカー3です)、私は何のメッセージも受け取ることができません。その間に直接 tcp:// localhost:61616(最初の受信者ブローカー1)に接続すると、コンシューマーはメッセージを受け取り、すべてうまく行きます。私はコネクター構成で何かを逃したと思う。これで私を助けてくれますか?
おかげで、
乾杯
networkBridgesがアクティブになります? ActiveMQからのログメッセージはありますか? –
@MattPavlovichスレッド「main」でExceptionを送信します。javax.jms.JMSException:ワイヤ形式のネゴシエーションのタイムアウト:ピアがワイヤ形式を送信しませんでした。 –