2016-10-05 2 views
1

メッセージを別のキューにプッシュするスプリッタプロセスがあります。さらなる処理のために、これらのメッセージを収集し、集約する別のプロセスがあります。スプリッタ/アグリゲータ(ファイア/忘却とタイムアウトあり)

分割と集約の間にタイムアウトが必要です。 IIUC集約タイムアウトは、最初のメッセージから始まり、すべての集約メッセージの後にリセットされます(完全なメッセージではなく間隔ベースです)。

これを解決する最適なソリューションは何ですか?

+0

だから、あなたは、メッセージが分割されたので、NNNN秒が経過した場合は、アグリゲータは、特定の集約が完了したと仮定したいですか?あれは正しいですか? –

+0

はい、「メッセージバンドル」を分割して集計する合計時間は、NNNN秒を超えてはなりません。 –

+0

ああ、私はその質問を完全に誤解していました。あなたはしばらくの間、集計を延期したいと思っていました。 –

答えて

0

EDIT

は、ここで私はそれがハッキングのビットだが、思い付くことができたのが最善です。まず、あなたはメッセージヘッダとしてタイムスタンプを保存し、身体をキューに公開:

from("somewhere") 
    .split(body()) 
    .process(e -> e.getIn().setHeader("aggregation_timeout", 
     ZonedDateTime.now().plusSeconds(COMPLETION_TIMEOUT))) 
    .to("aggregation-route-uri"); 

次に、消費し、集約するとき、あなたが最初のメッセージからaggregation_timeoutを救うカスタム集計戦略を使用しますその値を読み取ってタイムアウトが期限切れになっているかどうかを確認するcompletionPredicateを使用します(あるいは、メッセージの順序がそのまま維持されるように集約している場合は、最初のメッセージからヘッダーを読み取るだけです)。二つのメッセージ間の間隔が長いときのケースのための安全装置として短いcompletionTimeoutを使用します。

from("aggregation-route-uri") 
    .aggregate(bySomething()) 
    .aggregationStrategy((oldExchange, newExchange) -> { 
     // read aggregation_timeout header from first message 
     // and set it as property in grouped exchange 
     // perform aggregation 
    }) 
    .completionTimeout(1000) // intentionally low value, here as a safeguard 
    .completionPredicate(e -> { 
     // complete once the timeout has been reached 
     return e.getProperty("aggregation_timeout", ZonedDateTime.class) 
       .isAfter(ZonedDateTime.now()); 
    }) 
    .process(e -> // do something with aggregates); 
+0

スプリッターとアグリゲーターはどちらも独自のCamelContextを持っていると言いましたが、私たちは膨大な量のメッセージと外部システムによる長時間の実行を期待しています。集約情報は保持されます。私は提案されたソリューションが別々のCamelContextsで動作するとは思いませんか? –

+0

なぜそれは動作しないはずです、それは文脈にとらわれないです –

+0

メッセージバンドルが1つのメッセージしか含んでいない場合にはまだ問題があります。このようなメッセージは、タイムアウトが発生した場合にも処理(エラー処理)する必要があります。少なくともこれはすぐに使用可能なパターンがないことを確認します。 –

関連する問題