が解決取得しています:RabbitMQに直接返信してください。私はAlreadyClosedException
channel.basicPublishを移動します( ""、QUEUE、小道具、message.getBytes());
channel.basicConsume以下
(REPLYQUEUE、...)問題を修正
。
私はRabbitMQダイレクト返信機能を使用する方法を理解しようとしています。 documentationは実装方法がかなり曖昧なので、RPCの例を使用して、直接reply-toを使用するようにしました。
private final static String QUEUE = "Test_chan";
private void directReplyToClient(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//replyQueue = channel.queueDeclare().getQueue();
replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";
channel.basicPublish("", QUEUE, props, message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
response.offer(new String(body, "UTF-8"));
}
});
System.out.println(response.take());
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}
channel.queueDeclare()。getQueue()
作品への返信アドレスを設定しますが、
スレッド「メイン」com.rabbitmq.client.AlreadyClosedExceptionでに設定しますamq.rabbitmq.reply- 〜
例外:
は、次の例外を与える チャンネルはすでに、チャネルエラーのために閉じられています。プロトコル方式: 法(返信先コード= 406、返信テキストを= PRECONDITION_FAILED - 高速応答、消費者が存在しない場合、クラスID = 60、方法-ID = 40)
私はどこ誰が見るん何か間違っている?すべてのポインタが評価されます。
例外が私の下にあるチャネルを閉じることを試みていることが分かりました。だから私は既に閉鎖されたチャンネルを閉鎖しようとしている。なぜそれは既に閉鎖されていますか? – Orjanp
私は突然それを理解しました。 basicPublic呼び出しをbasicConsume呼び出しの下に移動するだけで、それは魅力的に機能します。 – Orjanp