2017-09-28 17 views
1

私は、ファイルが特定のフォルダのサブディレクトリ、ファイルが存在するクライアントを識別するサブディレクトリ、そしてレコードが解析されるファイルインポート・プロセスを実行しようとしています。スプリットし、Hazelcast SEDAキューに送信します。各レコードをHazelcast SEDAキューの読み取り値として処理したい場合は、集計できるステータスコード(作成、更新、またはエラー)を返します。Apache Camelスプリッタとヘイルキャスト・セダ・キュー

ファイルを最初に取得したときにジョブレコードを作成しています。作成、更新、およびエラーの最終カウントでジョブレコードを更新したいとします。

以下のJobProcessorは、このレコードを作成し、クライアントのOrganizationオブジェクトとJobオブジェクトをメッセージのヘッダーに設定します。 CensusExcelDataFormatはExcelファイルを読み取り、各行のEmployeeオブジェクトを作成してから、Collectionを返します。

from("file:" + censusDirectory + "?recursive=true").idempotentConsumer(new SimpleExpression("file:name"), idempotentRepository) 
     .process(new JobProcessor(organizationService, jobService, Job.JobType.CENSUS)) 
     .unmarshal(censusExcelDataFormat) 
     .split(body(), new ListAggregationStrategy()).parallelProcessing() 
     .to(ExchangePattern.InOut, "hazelcast:seda:process-employee-import").end() 
     .process(new JobCompletionProcessor(jobService)) 
.end(); 

from("hazelcast:seda:process-employee-import") 
     .idempotentConsumer(simple("${body.entityId}"), idempotentRepository) 
     .bean(employeeImporterJob, "importOrUpdate"); 

問題は、リストの集計がすぐに行われ、ステータスのリストを取得するのではなく、同じEmployeeオブジェクトのリストを取得していることです。 EmployeeオブジェクトをSEDAキューに送信し、キューの処理からの戻り値を集計して、JobCompletionProcessorを実行してジョブレコードを更新します。

答えて

0

動作はデフォルト動作です。 apacheのラクダsplitter documentationは、スプリッタが返すセクションでこれを明確に述べています。

  1. キャメル2.2またはそれ以上の年齢:スプリッタは、デフォルトでは最後の 分割さのメッセージを返します。
  2. キャメル2.3以降:スプリッタは、デフォルトで 元の入力メッセージを返します。
  3. すべてのバージョン:独自の 戦略をAggregationStrategyとして指定すると、これを無効にすることができます。このページのサンプルは (分割集計リクエスト/回答サンプル)です。アグリゲータがサポートするのは同じ 戦略であることに注意してください。このスプリッタは、軽量アグリゲータを組み込んだ と見ることができます。

あなたが分かるように、独自のスプリッタアグリゲーション戦略を実装する必要があります。これを行うにはAggrgationStrategyに以下のコードのようなものを実装する新しいクラスを作成:

public class MyAggregationStrategy implements AggregationStrategy 
{ 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     if (oldExchange == null) //this would be null on the first exchange. 
     { 
      //do some work on the first time if needed 
     } 
     /* 
     Here you put your code to calculate failed, updated, created. 


     */ 
    } 
} 

をあなたは、次の例のように、それを指定することで、カスタム集計戦略を使用することができます。

.split(body(), new MyAggregationStrategy()) //Java DSL 
<split strategyRef="myAggregationStrategy"/> //XML Blueprint 
+0

私は考えていませんそれはスプリッタと関係があります。これはローカルで実行するHazelcast SEDAキューを使用せずに実装しました。これは動作しています: –

+0

'from(" file: "+ censusDirectory +"?recursive = true ")。idempotentConsumer(new SimpleExpression(" file:name ")、idempotentRepository) .process(新しいJobPreProcessor(organizationService、jobService、Job.JobType.CENSUS)) .unmarshal(censusExcelDataFormat) .split(本体()、新しいListAggregationStrategy()) .parallelProcessing() .bean(employeeImporterJob、「processItem ") .end() .bean(新規JobCompletion(jobService)、" completeJob ") .end();' –