2011-03-08 7 views
8

非常に多くのメッセージを受信するJMSキューがあります。有効なJMS処理

リスナーは、データベーストランザクションを使用してデータベースにメッセージを保存してから、JMSトランザクションをコミットする必要があります。

私はそれをより効果的に行うことができます。データベースを行う必要はありません。&各メッセージでJMSコミット。

答えて

6

非同期メッセージングの背後にある前提は、特にMDBを使用する場合、各メッセージがアトミックであることです。つまり、あるメッセージを処理した結果は、他のメッセージを処理した結果とは無関係であると考えられます。あなたの問題に対する理想的な解決方法は、このメッセージの原子性を保持します。

同じ作業単位で複数のメッセージを処理する場合は、このアトミック性が失われます。たとえば、25メッセージごとに同期ポイントを設定するとします。 25番目のメッセージにエラーが発生した場合(たとえば、キュ​​ーからのコード・ページの変換に問題があった場合など)、メッセージ・バッチ全体がバックアウトされます。彼らはすべて再配送されるだろう。メッセージの再配信回数は、読み取り/バックアウトサイクルごとに増加します。再配信回数がアプリサーバーで設定されたしきい値を超えたら、構成に応じて25個のメッセージはすべて廃棄または再キューされます。バッチが大きくなればなるほど、バッチ全体が共存したり死んだりするため、エラー状態ではより多くのメッセージが影響を受ける可能性があります。バッチサイズを100に設定すると、単一の毒メッセージが発生した場合でも100のメッセージが危険にさらされます。

代わりの解決策は、MDB内に多くの処理スレッドを許可することです。 JMSを使用すると、同じ接続の下で多くのセッションを生成できます。各セッションは独自の作業単位を管理できるため、各セッションは独立してXAトランザクションを開始し、メッセージを取得し、データベースを更新してからトランザクションをコミットできます。 1つのメッセージが悪い場合、そのメッセージとデータベースの更新のみが影響を受けます。

これには例外があります。たとえば、大きなバッチを処理し、すべてのメッセージが同じプロデューサから発信された場合、MDB以外のものを使用して多くのメッセージをフェッチし、同じ作業単位で多くの行を更新するのが一般的です。同様に、メッセージがシーケンスに依存する場合、シーケンスを保存しないため、並列処理は不可能です。しかし、再び、配列依存のメッセージは原子的ではありません。この場合も、MDBは理想的なソリューションではありません。

トランスポートプロバイダによっては、サポートされるスレッドの数がメモリの記憶域によってのみ制限される場合があります。例えば、WebSphere MQは、待ち行列上の数百の同時ゲッター・スレッドを容易に処理できます。アプリケーションサーバーのMDB構成のチューニングを調べて、スレッド数を確認してから、トランスポートが負荷を処理できることを確認します。次に、最適なスレッド数を見つけるためにちょっと遊んでください。スレッドは1つ増えてもパフォーマンスは劇的に向上しますが、1つのポイントまでしか増加しません。これを過ぎると、一般にプラトーが表示され、スレッド管理オーバーヘッドとしての低下がパフォーマンスの向上を相殺します。 sweetスポットが存在する場所は、メッセージングブローカーがどれだけ負荷がかかっているか、CPU、メモリ、ディスク、またはネットワークによって最も制約されているかどうかによって異なります。

8

各メッセージでそれを行うのではなく、バッチで行います。 JMSは、DBと同じようにトランザクションをサポートします。 JMSトランザクションを開始し、N個のメッセージを読み取ります。 DBトランザクションを開始し、N個のメッセージを挿入します。 JMSにコミットし、DBにコミットします。

これは明らかに競合が発生する(2つのコミット間でクラッシュが発生する)ウィンドウを導入します。あなたは今それを持っていますが、ただ一つのメッセージのためです。この問題を解決したい場合は、XAトランザクション(2段階コミット)か、少なくとも何らかの重複検出スキームのいずれかを見て、直面しています。 http://activemq.apache.org/should-i-use-xa.html

+0

コールバックメソッド "onMessage"は一度に1つのメッセージしか返さないため、どのようにNメッセージを取得できますか。 – changed

+3

私はMessageListenerインターフェイスを使用せず、メッセージを受信したときにだけ動作します。あなたはこれを行うことができます(メンバ変数を介して受け取ったメッセージの数、トランザクションの開始とコミットなど)。メッセージに依存してアクションをトリガーするため、競合状態ウィンドウを拡張しています。それは本当に最善の方法ではありません。あなたは、メッセージをキューから読んだり(タイムアウトまたは非ブロックの呼び出しをブロックする)、N個のメッセージやY時間が経過したときにコミットを行う伝統的なループを行うほうがはるかに優れています。 –

+1

申し訳ありません - 具体的には、MessageListenerをsetMessageListener()に登録するのではなく、receive(timeout)とreceiveNoWait()というMessageConsumerインターフェイスメソッドを使用しています。 –

0

ここには、あるキューからメッセージを取り出し、リストに追加して別のキューにプッシュバックするjmsプロセッサがあります。あなたは、値がそれぞれの方法で読み込むと集計されている方法を調整することができます

public class JmsBatcher<T> { 
    final Session session; 
    private final MessageConsumer consumer; 
    private final MessageProducer producer; 
    private final int batchSize; 


    public JmsBatcher(final Connection connection, 
         final String sourceQueue, 
         final String destQueue, 
         final int batchSize) throws JMSException { 
     this.batchSize = batchSize; 
     session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
     final Queue source = session.createQueue(sourceQueue); 
     final Queue dest = session.createQueue(destQueue); 
     consumer = session.createConsumer(source); 
     producer = session.createProducer(dest); 
    } 

    public void processBatch() { 
     final List<T> values = new ArrayList<>(); 
     try { 
      while (values.size() < batchSize) { 
       final Message message = consumer.receive(); 
       values.add(readObject(message)); 
       message.acknowledge(); 
      } 
      producer.send(createAggregate(values)); 
      session.commit(); 
     } catch (Exception e) { 
      // Log the exception 
      try { 
       session.rollback(); 
      } catch (JMSException re) { 
       // Rollback failed, so something fataly wrong. 
       throw new IllegalStateException(re); 
      } 
     } 
    } 

    private Message createAggregate(final List<T> values) throws JMSException { 
     return session.createObjectMessage((Serializable) values); 
    } 

    private T readObject(final Message message) throws JMSException { 
     return (T) ((ObjectMessage) message).getObject(); 
    } 
} 

これは別のスレッドで開始され、永遠に実行することができます:

final JmsBatcher jmsBatcher = 
    new JmsBatcher(connection, "single", "batch", 25); 
new Thread(() -> { 
    while (true) { 
     jmsBatcher.processBatch(); 
    } 
}).start(); 

あなたは、データベースにコミットすることができますバッチ処理された結果からバッチ処理されます。失敗した場合、トランザクションは再試行されます。

関連する問題