2017-05-04 17 views
2

私のキューには異なるソースシステムからのメッセージがあるというこの問題があります。異なるソースシステムと単一のコンシューマに対するアクティブなMQ同時実行

たとえば、最初のメッセージには「X」というソースシステム名があり、2番目のソースシステム名は「Y」です。

現在のところ、並行処理レベルが1のJMSリスナーがあります。すべてのメッセージが1つずつ処理されるようになっていますが、メッセージは同時に処理する必要があります。そのソースシステムに対して一度にメッセージを処理する必要があり、異なるソースシステム用のメッセージがある場合は、それらを並行して実行する必要があります。

ソースシステムが動的に作成されているため、ソースシステムごとにキューとコンシューマーを分離することができません。

誰かが私を正しい方向に押し込んでくれれば素晴らしいだろう。

答えて

0

あなたの問題は、特定のソースから発信されたメッセージの順序付き配信を維持することですが、さまざまなソースからのメッセージを並行して処理することができるように思えます。

message groupsを使用してこれを行うことができます。

メッセージ・アプリケーションは、ブローカーを使用して、関連する一連のメッセージをグループに属するものとして分類することができます。これにより、メッセージ・プロデューサは、メッセージ・グループがアプリケーションに対して単一の論理操作とみなされるべきであることを消費者に示すことができる。 。

この仕事をするために、生産システム生産システム名にJMSXGroupIDヘッダを設定している:

Mesasge message = session.createTextMessage("<message />"); 
message.setStringProperty("JMSXGroupID", "SourceSystem1Name"); 

その後ブローカは、そのグループに属するメッセージの中で、消費順序付けを実施します。

補遺

それらが動的に を作成しているように、ソース・システムのN番号があるかもしれませんし、キュー

に これらのNソース・システムからのすべてのメッセージを置くプロデューサーがあります

したがって、メッセージプロデューサはJMSXGroupIDヘッダーをソースシステムの名前に設定できます。

私が直面してる問題は、一つのソースシステムのためのメッセージが が処理取得されている場合は、他のソースシステムメッセージが

だから、一度そのメッセージの処理の 完了まで待つ必要がありますグループヘッダーが記述されているように設定されている場合、ブローカーは指定されたソースシステムのメッセージをコンシューマーに順番にリリースするだけです。これは、グループ内の次のメッセージを解放する前に、前のメッセージが処理されたことを確認するように消費者に強制することによって行われます。

並行性を適切な値に設定することで、コンシューマはさまざまなソースシステムからのメッセージを並行して処理できますが、必要な動作である任意のソースシステムからのメッセージを順次処理することが強制されます。

+0

JMSXGroupIDを設定することで、メッセージXをコンシューマが処理している(コンカレント= 1)メッセージを連続して処理するソースXが100%確実になることはありません。そして、その消費者が落ちるなら、それはすべてのメッセージを扱うもう1人です。私はそれがOPが望んでいるとは思わない。 –

+0

@ruffp私の前提は、OPは注文された配送にのみ関係しているということです。私は彼がある消費者から別の消費者へのグループの親和性の切り替えを気にしないと思っています。特定のソースシステムからのメッセージだけが順番に処理されます。メッセージグループを使用することにより、ブローカはグループ内のメッセージが順番に配信されることを保証します。 –

+0

@tom_redfern文章 'もし、異なるソースシステムのためのメッセージがある場合は、それらを並列に実行する必要があります。私は他のすべてのシステムを並行して扱わなければならないと考えさせてください。このグループでは、すべてのシステムインスタンスが順番に処理され、全体的に見て、すべてのシステムが並行して処理されます(例:すべてXはconsumer1に、Yはすべてconsumer2に、Zはすべてconsumer3になります)。 –

0

コンシューマはコンシューマ側で宣言されているため、別の変数に従って動的に変更することはできません。私はお勧めし

ソリューションは、セレクタを使用して、次のような2つの消費者の接続を使用することです:並列処理のための

消費者のエンドポイント(この場合は5)意志のようになります。

activemq:queue:MY_QUEUE_UNIQUE?concurrentConsumers=5&selector=MySourceHeader<>'X' 

消費者のエンドポイント連番の処理のためにルックスは次のようになります:

activemq:queue:MY_QUEUE_UNIQUE?concurrentConsumers=1&selector=MySourceHeader='X' 

これは、キューの1のフェッチサイズを定義するだけでなく良いでしょう(でも、私はSではありませんよそれは必須です)。

関連する問題