私はRabbitMQでJavaで作業しています。
私は同じ構成の2つのRabbitMQサーバーを持っています.1つは開発環境で、もう1つは運用環境です。
これは、消費者の宣言です:java - RabbitMQコンシューマーはメッセージを受信しません
/*
* Connection and channel declaration
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(prop.getProperty("ConnectionURI"));
connection = factory.newConnection();
channel = connection.createChannel();
/*
* Queue declaration and exchange binding
*/
channel.exchangeDeclare(prop.getProperty("printExchange"), "topic", false, false, false, new HashMap<>());
queueName = prop.getProperty("printQueue");
routing_key = "print." + codCliente + "." + idCassa;
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, prop.getProperty("printExchange"), routing_key);
そして、ここではそれがキューで待機するように開始します:本番環境では、私が持っている間、開発環境では
JAyronPOS.LOGGER.info("Waiting for a message on the queue -> " + queueName + " with routingkey -> " + routing_key);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
JAyronPOS.LOGGER.info("This is the received message -> " + queueName + ": " + new String(body, "UTF-8"));
Map<String, Object> headers = properties.getHeaders();
if (envelope.getRoutingKey().equals(routing_key)) {
JAyronPOS.LOGGER.info("Message is for me, because it has my routing key");
channel.basicAck(envelope.getDeliveryTag(), false);
if (headers != null) {
if (headers.containsKey("command")) {
JAyronPOS.LOGGER.info("It's a command!");
JAyronPOS.LOGGER.info(headers.get("command").toString());
if ("requestClose".equals(headers.get("command").toString())) {
ChiusuraFiscaleConfirm confirm = gson.fromJson(new String(body, "UTF-8"), ChiusuraFiscaleConfirm.class);
if (confirm.isCanClose()) {
eseguiChiusuraFiscale();
} else {
JOptionPane.showMessageDialog(null, "Can't close", "Error", JOptionPane.ERROR_MESSAGE);
}
} else {
JAyronPOS.LOGGER.info("Can't handle the message");
}
}
} else {
System.out.println("It's a ticket");
TicketWrapper ticket = gson.fromJson(new String(body, "UTF-8"), TicketWrapper.class);
printTicket(ticket);
}
}else{
JAyronPOS.LOGGER.info("The message isn't for me, because it has the routingkey: "+envelope.getRoutingKey());
}
}
};
channel.basicConsume(queueName, false, consumer);
、私は、最大5つのキューを持っています150〜200のキューの間。
メッセージは、個人のrouting_keyを使用して、Exchangeによって送信されます。送信されたメッセージの数は多くありません(強調されている間は10 msg/sを超えない)。
私が開発環境でコンシューマーをテストすると、すべてがOKです。
- 私はRPC呼び出しを送信し、サーバーはそれを処理して応答します。消費者は返信を読んで適切な方法を呼び出す。約1〜2秒ですべて。
私が本稼働環境でソフトウェアを使用すると(config.propertiesファイル内の行をコメント化/デリートするだけで環境を変更する)、動作しません。
- RPC呼び出しを送信し、サーバーがそれを処理します応答をキューに送信します。消費者はメッセージを受け取ることはありません(ただし、Web管理パネルでキューに表示されたメッセージを見ることができます)。
これは問題がありますか?
編集:私は、私は応答キューに、RabbitMQのウェブパネルで、RPCコールを送信する場合、私は3-4 RPCコールを送信する場合ながら(水色)「を配信」の下のメッセージがあることに気づい (以前のものと同じ)、いくつかの呼び出しの後、返信キューにメッセージがパブリッシュ(黄色)の下にあり、コンシューマーが応答を受け取ります。