2011-12-20 15 views
2

恒久サブスクライバを持つトピックがあります。メッセージを公開して消費することはできますが、トピックからメッセージを読むのには少し時間がかかることがわかります。JMSからメッセージを消費している間に遅延が発生するトピック

1回の呼び出しでメッセージを読み取ることができません。メッセージを読むためにメソッドを複数回呼び出す必要があります。私は何か不足していますか?

private void publishMessage() { 
     TopicConnection topicConnection = null; 
     TopicSession topicSession = null; 
     TopicPublisher topicPublisher = null; 
     try { 
      topicConnection = connectionFactory.createTopicConnection(); 
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 
      Topic topicName= topicSession.createTopic(topicName); 
      topicPublisher = topicSession.createPublisher(topicName); 
      ObjectMessage message = topicSession.createObjectMessage(customObject) 
      message.setStringProperty("user", userProperty); 
      topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive); 
     } catch (JMSException e) { 
      throw new RuntimeException("Error Sending UMessage", e); 
     } finally { 
      closeConnections(null, topicPublisher, topicSession, topicConnection); 
     } 
    } 

public void consumeMessages(String userId, int maxResults) { 
    TopicConnection topicConnection = null; 
    TopicSession topicSession = null; 
    TopicSubscriber topicSubscriber = null; 

    try { 
     topicConnection = connectionFactory.createTopicConnection("guest","guest"); 
     topicConnection.setClientID("topic"); 
     topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 
     Topic topicName= topicSession.createTopic(topicName); 
     topicSubscriber = topicSession.createDurableSubscriber(topicName, "subscriptionname", String.format("user = '%s'", userName), false); 
     topicConnection.start(); 
     Message msg = null; 

     do { 
     msg = topicSubscriber.receiveNoWait(); 
     if (msg instanceof ObjectMessage) { 
      ObjectMessage om = (ObjectMessage) msg; 
     else { 
      log.error(String.format(" %s", om.getObject().getClass().getSimpleName())); 
      } 
     } else if (msg != null) { 
      log.error(String.format("e %s", msg.getClass().getSimpleName())); 
     } 
     } while (msg != null && out.size() <= maxResults); 
    } catch (JMSException e) { 
     throw new RuntimeException("Error retrieving User Messages", e); 
    } finally { 
     closeConnections(topicSubscriber, null, topicSession, topicConnection); 
    } 
    return out; 
} 

答えて

0

あなたは一度に1つのメッセージを取得するreceiveNoWait()を呼び出しています。通常、JMSプロバイダによっては、JMSクライアントが多数のメッセージを一度に取得し、クライアント側でキャッシュしてネットワーク待ち時間を短縮するということがあります。受信を呼び出すと、このキャッシュからメッセージを受け取り、それをあなたに渡します。

長い遅延が発生している場合は、これらのメッセージをどのようにトピックに追加するかに問題があるか、各メッセージを処理している間にメッセージ受信をブロックしています。処理中にメッセージ受信をブロックしない場合は、受信メソッドを使用する代わりにMessageListenerインターフェイスを実装するか、受信メソッドからメッセージを取り出してスレッドプールで非同期に処理することができます。

あなたが消費者を作成するときに、あなたがそうのようにリスナーを追加することができます。

MessageListener listener = new MyListener(); 
consumer.setMessageListener(listener); 

は、次にメッセージを処理したり、既存の消費者クラスのインターフェイスを実装するクラスを作成します。

public class MyListener implements MessageListener { 
    public void onMessage(Message message) 
    { 
    TextMessage text = (TextMessage) message; 

    System.out.println("Message: " + text.getText()); 
    } 
} 
+0

はい、私はサブスクライバのすべての利用可能なメッセージを取得したい。これは、指定された数のメッセージに対してループする理由です。それはすべてのメッセージを読んでいますが、出版直後ではありません.5〜6分かかります。デバッグモードでjbossを実行すると、私は奇妙な動作を観察しました。ブレークポイントがあるとすぐにメッセージを読み込んでいます。しかし、通常モードで実行すると、読み込むのに時間がかかります。 – John

+0

JBossについて言及して以来、私はあなたがJMSプロバイダとしてアプリケーションサーバの内部でJBoss Messagingを使用していると仮定していますか? – gregwhitaker

+0

はい、そうです。 JBossが提供するJMSサーバを使用しています。私は設定がありませんか? – John

関連する問題