2016-09-01 2 views
1

私はラクダアプリケーションを持っていますが、私はFTPソースからファイルを読み込んでいます。 ファイルが複数のルートを通過するように、1つのルートが保管のためにcassandraに行き、1つのルートがデータを処理し、ピボットデータをKafkaトピックにプッシュするなどアグリゲーターを使ってチェックファイルがすべてのラクダルートで処理されます

すべてのルートと到達最後まで。このようにして、ファイル名に基づいて処理済みログを構築することができます。私が考えることができる

一つの方法は、各ルートが交換ヘッダに完了通知を送信し、その後、処理され、アグリゲータで完了条件ロジックに基づいて、私はそのファイルをマークするアグリゲータを実現することです。

このようなアグリゲータはどのようにJavaで記述しますか?

答えて

3

multicastを試してみることができます。

from("direct:start") 
    .multicast() 
     .to("direct:a","direct:b") 
    .end() 
    // Won't run until the sub routes are complete 
    .process(new MarkFileAsCompletedProcessor()) 
    .log("Finished multicast"); 

from("direct:a") 
    .log("Processing a") 
    .to("mock:endOfA"); 

from("direct:b") 
    .log("Processing b") 
    .to("mock:endOfB"); 
+0

現在、私はマルチキャストを使用していません。 '( "ダイレクト:開始")から // .multicast() .TO( "ダイレクト:A"、 "ダイレクト:B") .END() が//サブルートまで実行されませんが完了しました。 。process(new MarkFileAsCompletedProcessor()) .log( "Finished multicast");あなたは(_end後に行うどのような処理だから、 ' 、)_、すべてのルートが完了している場合にのみ実行されますか?ご確認ください。 –

+0

修正。したがって、サンプルコードでは、 "MarkFileAsCompletedProcessor"は各サブルートが完了するまで呼び出されません。 –

+0

ありがとう@重力 –

関連する問題