2017-05-23 14 views
0

背景: 標準のプロデューサコンシューマキューがあり、プロデューサが高速である間はコンシューマが遅くなります。予想は、プロデューサが要求されたメッセージを完了するたびにメッセージを確認し、プロデューサはメッセージに関連付けられたタスクが完了したとみなします。プロデューサは高速ですので、スレッドを必要としません。メッセージを確認するたびに、コールバックを呼び出す必要があります。 JMSはこのフロントでは限られているので、できるだけ直接ActiveMQMessageProducerのようなActiveMQクラスを使用しています。ActiveMQ実装の非同期確認JAVA 8

問題: メッセージが自動認識されています。登録済みの非同期コールバックは、コンシューマーがまだ開始していなくても呼び出されています。 public void send(Destination destination, Message message, AsyncCallback onComplete)

プロデューサー

public static boolean setup() {  
     Producer.connectionFactory = new 
     ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
     // Create a Connection 
     Producer.connection = 
      (ActiveMQConnection)connectionFactory.createConnection(); 
     connection.setAlwaysSessionAsync(true); 
     connection.start();   
    } 

public Producer() { 
     session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
     destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
     producer = (ActiveMQMessageProducer)session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     } 
... 

public void run() { 
     long id = messageID.getAndIncrement();   
     String text = "Hello world!" 
     Message message = session.createTextMessage(text); 
     producer.send(message, new MessageCompletion(id, this.messageRundown)); 
    } 

消費者

public static boolean setup() {  
    Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");  
    Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection(); 
    connection.setAlwaysSessionAsync(true);   
    return true; 
} 

public Consumer() { 
    session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
    destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
    consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); 
    consumer.setMessageListener(this); 
    connection.start(); 
} 

// implements MessageListener 
@Override 
public void onMessage(Message message) {  
    messageQueue.add(message); 
} 
public void run() { 
    while(true) { 
     Message message = messageQueue.poll(); 
     while(message != null) { 
      // do some work    
      message.acknowledge(); 
      message = messageQueue.poll();    
     } 
     Thread.sleep(10000);    
    } 
} 

消費者は、私は参考のためにそれを追加している必要はありませんが、スタッフはこれがの一部であり、簡潔さを確保するために削除されましたワーキングコード。

答えて

1

ご了解の仕方が間違っています。送信者の非同期コールバックは、ブローカがメッセージを受信したことを通知するだけです。永続的な送信の場合、コールバックはメッセージがディスクに書き込まれたことを示します。

JMSまたは他のほとんどのメッセージブローカーには、プロデューサとコンシューマの結合はありません。プロデューサはキューにメッセージを配置し、消費者はいつでも来てそのキューから消費することができます。両者の結合はなく、プロデューサーは次のメッセージを出す前に消費者を待つことはできません。

特定のメッセージが処理されるタイミングを知りたい場合は、JMS Request/Responseスタイルのメッセージングパターンを調べる必要があります。

+0

ありがとうございます!私は今これが良いと理解していますが、非同期コールバック実装のJMSExceptionハンドラも今追加されています。だから、高いパーフォーマンス、永続的なメッセージキューのために、それは我々がその側面のためにあまりにも多くの時間を過ごされていないことを確認するために非同期を保つのが理にかなっていますか? – amritanshu

+0

永続メッセージの場合は、各メッセージが永続化されることをいつ実際に保証するかを知りたいかどうかを判断する必要はありません。 –

関連する問題