背景: 標準のプロデューサコンシューマキューがあり、プロデューサが高速である間はコンシューマが遅くなります。予想は、プロデューサが要求されたメッセージを完了するたびにメッセージを確認し、プロデューサはメッセージに関連付けられたタスクが完了したとみなします。プロデューサは高速ですので、スレッドを必要としません。メッセージを確認するたびに、コールバックを呼び出す必要があります。 JMSはこのフロントでは限られているので、できるだけ直接ActiveMQMessageProducer
のようなActiveMQクラスを使用しています。ActiveMQ実装の非同期確認JAVA 8
問題: メッセージが自動認識されています。登録済みの非同期コールバックは、コンシューマーがまだ開始していなくても呼び出されています。 public void send(Destination destination, Message message, AsyncCallback onComplete)
プロデューサー
public static boolean setup() {
Producer.connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// Create a Connection
Producer.connection =
(ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
connection.start();
}
public Producer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
producer = (ActiveMQMessageProducer)session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
...
public void run() {
long id = messageID.getAndIncrement();
String text = "Hello world!"
Message message = session.createTextMessage(text);
producer.send(message, new MessageCompletion(id, this.messageRundown));
}
消費者
public static boolean setup() {
Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
return true;
}
public Consumer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
consumer.setMessageListener(this);
connection.start();
}
// implements MessageListener
@Override
public void onMessage(Message message) {
messageQueue.add(message);
}
public void run() {
while(true) {
Message message = messageQueue.poll();
while(message != null) {
// do some work
message.acknowledge();
message = messageQueue.poll();
}
Thread.sleep(10000);
}
}
消費者は、私は参考のためにそれを追加している必要はありませんが、スタッフはこれがの一部であり、簡潔さを確保するために削除されましたワーキングコード。
ありがとうございます!私は今これが良いと理解していますが、非同期コールバック実装のJMSExceptionハンドラも今追加されています。だから、高いパーフォーマンス、永続的なメッセージキューのために、それは我々がその側面のためにあまりにも多くの時間を過ごされていないことを確認するために非同期を保つのが理にかなっていますか? – amritanshu
永続メッセージの場合は、各メッセージが永続化されることをいつ実際に保証するかを知りたいかどうかを判断する必要はありません。 –