2017-08-18 6 views
0

プロデューサがキャッシュからレコードを読み込み、キューに入れ、コンシューマがメッセージを消費して処理を続ける、私のノーツフローにJMSがあります。以下は理解のための流れです。JMSが完全な処理を完了するまで、Muleプロセスを保持する方法

サービス1(ファイルからデータを読む) - >サービス2(キャッシュ内の各ラインを入れて) - > JMSサービス3(プロデューサー読むラインによってキャッシュラインからのデータとキューに入れ)と消費者キューから読み取ります - >サービス上記フローでは4

は、JMSコンポーネントから、流れが故に非同期になり、すぐにプロデューサーがキュー応答内のすべてのレコードを置くなどの処理が完了したと言ってクライアントに戻りますが、消費者がまだメッセージを消費しようとしている可能性があります。

私は消費者がすべてのメッセージを消費するまで、プロデューサからプロセスを保持して応答を返送したいと考えています。

どのようにこれを達成するためのアイデアですか?

答えて

0

非同期は正確なスレッドとプロセスのコピーを独立して取得するため、プロデューサが実際にコンシューマが消費する前にメッセージをキューに入れている可能性があります。
メッセージをキューに入れるプロセスを保持すると考えることの1つは、その前にsleep()を置くことです。
Groovyコンポーネントを使用し、その中にsleep()を使用すると、フローを保持したり、処理を遅くすることができます。
次置く場合、たとえば、:キューにメッセージを入れる前

<scripting:component doc:name="Groovy"> 
    <scripting:script engine="Groovy"><![CDATA[ 
    sleep(10000); 
    return message.payload;]]> 
    </scripting:script> 
</scripting:component> 

を、プロセスが少し遅くなり、10000 msの流れを保持するには、実際には他の側に消費者を耕しそれを消費する。

0

上記のような完了ステータスのポーリングは正常に機能するかもしれませんが、待ち時間の後に完了しないトランザクションや、すべてのメッセージが処理されてからずっと待っているというリスクがあります。

この練習の最終目標によっては、すでにインバウンドリクエストを個々のメッセージに分割し、1つまたは複数のコンシューマスレッド内のメッセージを処理し、処理されたチャンクを追跡しているMuleバッチを活用することができます。すべてのデータが処理された後、結果を報告/最終ステップを実行します。

バッチを使用できず、処理されたメッセージを単一のリストまたはマップに再構成する必要がある場合は、コレクションアグリゲータが相関IDでメッセージを追跡し、タイムアウトを設定するジョブを実行できます。

DIYの実装方法は、JMSパブリッシングコンポーネント用のディスパッチャロジックを作成することです。すべてのメッセージをJMSに送信し、各コンシューマ/ワーカースレッドが同じ相関IDを持つ完了メッセージで(個別のJMSキューを介して)応答するのを待ちます。ディスパッチャは、メモリ内または永続的なストレージ内のすべてのサブミット/処理されたメッセージを追跡し、バッチ内の最後のメッセージが確認応答されるか、あらかじめ定義されたタイムアウトによって返答します。ミュールバッチがすでに行っていることに非常に近いものです。

乾杯! Dima

0

フローがJMSからの応答を待つように、要求パターンの値をリクエストレスポンスとして使用できます。

関連する問題