0
こんにちは、私はrabbitMQでメッセージを送信しようとしています。私は直接交換を宣言し、それをプロデューサー側の待ち行列にルーティングしていますが、私は消費者でメッセージを受け取ることができません。次のようにRabbitMQプロデューサーからのメッセージを受信していないコンシューマー
package com.rabbit.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "credit1";
public static void main(String[] args) throws IOException, TimeoutException {
boolean autoAck = false;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/* DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);*/
channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'" + "Routing Key : "+ routingKey);
channel.basicAck(deliveryTag, false);
}
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
// TODO Auto-generated method stub
System.out.println("Recieving Ended");
System.out.println("Channel Closed : " + sig.getMessage());
}
});
}
}
そして、私のプロデューサー・コードは次のとおりです:私の消費者のためのコードで、次の行
channel.queueBind(QUEUE_NAME, "offerExchange", "credit");
にあった
package com.rabbit.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.Exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Producer {
private static final String QUEUE_NAME = "credit1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("offerExchange", "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, "offerExchange", "credit");
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
BasicProperties properties,
byte[] body) throws IOException {
// TODO Auto-generated method stub
System.out.println("Message Failed" + replyText);
}
});
String message = "Hello rabbit just hop !";
/* for(int i=1;i<=100000;i++){*/
channel.basicPublish("offerExchange", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
/* }*/
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}