2011-01-06 4 views
20

メッセージを生成してJMSキューを介して一意のコンシューマに送信するJMSクライアントがあります。JMS - 1人から複数のコンシューマに移動する

私が望むのは、複数の消費者がこれらのメッセージを受け取ることです。私の頭に浮かぶ最初のことは、キューをトピックに変換することです。そのため、現在の消費者と新しい消費者は購読して、同じメッセージをすべてのユーザーに配信できます。

これは明らかに、物事のプロデューサー側とコンシューマー側の両方で現在のクライアントコードを変更することを含むでしょう。

また、既存のコンシューマを変更する必要がないように、2番目のキューを作成するなどの他のオプションも検討したいと思います。私は、このアプローチでは、2つの異なるキュー間で負荷を均衡させる(私が間違っていれば修正する)ような利点があると信じています。これはパフォーマンスにプラスの影響を与えます。

私はこれらのオプションと反対意見をお伝えしたいと思います。どんなフィードバックも高く評価されます。

答えて

45

あなたが述べたようにいくつかの選択肢があります。

トピックに変換して同じ効果を得る場合は、コンシューマを永続コンシューマにする必要があります。あなたの消費者が生きていなければ、キューが提供するものの1つは永続性です。これは、使用しているMQシステムによって異なります。

キューを使用する場合は、各コンシューマ用のキューと、元のキューで待機するディスパッチャを作成します。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

トピック

  • 賛否動的に新しい消費者を追加する方が簡単。すべての消費者は、仕事をしなくても新しいメッセージを受け取ることができます。
  • Consumer_1がメッセージを取得し、次にConsumer_2、Consumer_3を取得できるようにラウンドロビンのトピックを作成できます。
  • 消費者はキューにクエリを発行しなくても反応することができます。あなたのブローカーは、この構成をサポートしていない限り、トピックの

短所

  • メッセージは永続的ではありません。消費者がオフラインになって戻ってくると、永続的なコンシューマが設定されていない限り、メッセージを見逃してしまう可能性があります。
  • Consumer_1とConsumer_2がConsumer_3ではなくメッセージを受信するのが難しい。ディスパッチャーとキューでは、DispatcherはConsumer_3のキューにメッセージを入れることができません。キュー

    賛否消費者が

  • ディスパッチャは、各消費者のキューにメッセージを配置しないことにより、そのメッセージを取得した消費者がフィルタリングすることができ、それらを削除するまでメッセージは永続的です。これは、フィルタを通してトピックを使って行うことができます。キュー

短所

  • 追加のキューは、複数の消費者をサポートするために作成する必要があります。動的な環境では、これは効率的ではありません。

メッセージングシステムを開発するときは、トピックを優先しますが、すでにキューを使用しているときにトピックを実装する方法を変更する必要があります。

、複数の消費者との設計・キューシステムの構築

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

ソース

あなたは、問題の例外処理などの世話をする必要があります他のものは、ある点に注意してくださいあなたが接続を失った場合、接続とキューへの再接続。これは、私が記述したものを達成する方法のアイデアを与えるために設計されています。

実際のシステムでは、私はおそらく最初の例外で終了しないでしょう。私は、システムが可能な限り最良の状態で動作し続け、エラーを記録できるようにします。このコードでは、単一のコンシューマキューにメッセージを格納できない場合、ディスパッチャ全体が停止します。

Dispatcher.java

/* 
* To change this template, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package stackoverflow_4615895; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.QueueConnection; 
import javax.jms.QueueConnectionFactory; 
import javax.jms.QueueSession; 
import javax.jms.Session; 

public class Dispatcher { 

    private static long QUEUE_WAIT_TIME = 1000; 
    private boolean mStop = false; 
    private QueueConnectionFactory mFactory; 
    private String mSourceQueueName; 
    private String[] mConsumerQueueNames; 

    /** 
    * Create a dispatcher 
    * @param factory 
    *  The QueueConnectionFactory in which new connections, session, and consumers 
    *  will be created. This is needed to ensure the connection is associated 
    *  with the correct thread. 
    * @param source 
    * 
    * @param consumerQueues 
    */ 
    public Dispatcher(
     QueueConnectionFactory factory, 
     String sourceQueue, 
     String[] consumerQueues) { 

     mFactory = factory; 
     mSourceQueueName = sourceQueue; 
     mConsumerQueueNames = consumerQueues; 
    } 

    public void start() { 
     Thread thread = new Thread(new Runnable() { 

      public void run() { 
       Dispatcher.this.run(); 
      } 
     }); 
     thread.setName("Queue Dispatcher"); 
     thread.start(); 
    } 

    public void stop() { 
     mStop = true; 
    } 

    private void run() { 

     QueueConnection connection = null; 
     MessageProducer producer = null; 
     MessageConsumer consumer = null; 
     QueueSession session = null; 
     try { 
      // Setup connection and queues for receiving the messages 
      connection = mFactory.createQueueConnection(); 
      session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
      Queue sourceQueue = session.createQueue(mSourceQueueName); 
      consumer = session.createConsumer(sourceQueue); 

      // Create a null producer allowing us to send messages 
      // to any queue. 
      producer = session.createProducer(null); 

      // Create the destination queues based on the consumer names we 
      // were given. 
      Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; 
      for (int index = 0; index < mConsumerQueueNames.length; ++index) { 
       destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); 
      } 

      connection.start(); 

      while (!mStop) { 

       // Only wait QUEUE_WAIT_TIME in order to give 
       // the dispatcher a chance to see if it should 
       // quit 
       Message m = consumer.receive(QUEUE_WAIT_TIME); 
       if (m == null) { 
        continue; 
       } 

       // Take the message we received and put 
       // it in each of the consumers destination 
       // queues for them to process 
       for (Queue q : destinationQueues) { 
        producer.send(q, m); 
       } 
      } 

     } catch (JMSException ex) { 
      // Do wonderful things here 
     } finally { 
      if (producer != null) { 
       try { 
        producer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (consumer != null) { 
       try { 
        consumer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (session != null) { 
       try { 
        session.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException ex) { 
       } 
      } 
     } 
    } 
} 

Main.java良い選択肢でしょう

QueueConnectionFactory factory = ...; 

    Dispatcher dispatcher = 
      new Dispatcher(
      factory, 
      "Queue_Original", 
      new String[]{ 
       "Consumer_Queue_1", 
       "Consumer_Queue_2", 
       "Consumer_Queue_3"}); 
    dispatcher.start(); 
+0

+1良い答え。 – skaffman

+0

それは素晴らしい答えでした。私はHornetQであるJBossのMOM実装を使用しています。 –

+0

@Anonimo前回JBossがJMS仕様に準拠していることを確認しました。これは、JMS仕様では説明していないトピックを動的に作成するため、過去の私の欲求不満を引き起こしました。 ActiveMQのような他のものは、トピックを動的に作成することができ、同じ機能を可能にするためにJBossで1行のコード変更だけが必要でした。 –

4

コードを変更する必要はありません。それはあなたがそれを書いた方法に依存します。

たとえば、コードがQueueSenderではなくMessageProducerを使用してメッセージを送信する場合は、トピックとキューの両方で機能します。同様にQueueReceiverではなくMessageConsumerを使用した場合も同様です。

基本的に、それはそのような場合、それは設定の「単なる」問題だなどMessageProducerMessageConsumerDestination、として、JMSシステムと対話するために、非特定のインターフェイスを使用するJMSアプリケーションでは良い習慣です。

+0

。残念ながら、QueueSenderのような特定のインターフェースを使用しています。私たちがリファクタリングしているなら、これは間違いなく私が念頭に置くものです。 –

関連する問題