1

異なるシステムからメッセージブローカー(現在はJMS)経由でメッセージを受信するシステムを構築しています。すべての送信者システムからのすべてのメッセージにはdeviceIdがあり、メッセージの受信には順序がありません。 たとえば、システムAはdeviceId = 1のメッセージを送信でき、システムb beはdeviceId = 2のメッセージを送信できます。統合パターン:複数のシステムから受信した処理メッセージを同期する方法

私は、同じdeviceIdを持つすべての送信者からすべてのメッセージを受け取らない限り、同じdeviceIdに関するメッセージの処理を開始しないことを目標にしています。例えば

、私は3つの系統A、BおよびCは、私のシステムにメッセージを送信している場合:

System A sends messageA1 with deviceId=1 
System B sends messageB1 with deviceId=1 
System C sends messageC1 with deviceId=3 
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1. 

この問題は、メッセージ・ブローカまたはANで、私のシステムでは、いくつかの同期メカニズムを使用することによって解決しなければならないが春の統合/ apacheのラクダのような統合フレームワーク?

+0

サービスがシステムから同じdeviceIdを持つ2つのメッセージを受信して​​も問題はありませんか?別々のメッセージを1つずつ処理しますか?例(deviceId = 1): 'messageA1、messageA1、messageA1、messageC1、messageC1、messageB1' - >プロセスが起動しました – mgyongyosi

+0

いいえ問題ありません...重要なのは、処理しなければならないデータが含まれているため、3つのメッセージがすべて到着したときに処理を開始することです –

答えて

2

アグリゲータ(@Artem Bilanが述べたもの)と同様のソリューションは、プロパティを使用してカスタムAggregationStrategyとアグリゲータの完了を制御してCamelで実装することもできます。

以下は、適切な出発点になる場合があります。 (You can find the sample project with tests here

ルート:

from("direct:start") 
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}") 
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3)) 
    .log(LoggingLevel.INFO, "Signaled body: ${body}") 
    .to("direct:result"); 

SignalAggregationStrategy.java

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate { 

    private int numberOfSystems; 

    public SignalAggregationStrategy(int numberOfSystems) { 
     this.numberOfSystems = numberOfSystems; 
    } 

    @Override 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     Exchange exchange = super.aggregate(oldExchange, newExchange); 

     List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class); 

     // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different) 
     // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy 
     if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) { 
      exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true); 
     } 

     return exchange; 
    } 

    @Override 
    public boolean matches(Exchange exchange) { 
     // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion) 
     return false; 
    } 
} 

はそれが役に立てば幸い!

1

キャッシングコンポーネントを使用してApache Camelでこれを行うことができます。 EHCacheコンポーネントがあると思います。

基本的に:

  1. あなたが与えられたデバイスIDを持つメッセージを受け取るには、deviceId1を言います。
  2. キャッシュ内を検索して、deviceId1で受信したメッセージを確認します。
  3. 3つをすべて受け取らない限り、現在のシステム/メッセージをキャッシュに追加します。
  4. すべてのメッセージがあると、キャッシュを処理してクリアします。

その後、受信した各メッセージを、一時的な保存用の特定のdeviceIdベースのキューにルーティングすることができます。これは、JMS、ActiveMQなどとすることができます。

1

春の統合では、この種のタスクのためのコンポーネントが提供されます。グループ全体が収集されるまでは放出しません。名前はAggregatorです。 deviceIdは間違いなくcorrelationKeyです。 releaseStrategyは実際にはシステムの数に基づいている可能性があります - どのくらいのメッセージが次のステップに進む前に待っていますか。deviceId1

関連する問題