私は、ファイルが特定のフォルダのサブディレクトリ、ファイルが存在するクライアントを識別するサブディレクトリ、そしてレコードが解析されるファイルインポート・プロセスを実行しようとしています。スプリットし、Hazelcast SEDAキューに送信します。各レコードをHazelcast SEDAキューの読み取り値として処理したい場合は、集計できるステータスコード(作成、更新、またはエラー)を返します。Apache Camelスプリッタとヘイルキャスト・セダ・キュー
ファイルを最初に取得したときにジョブレコードを作成しています。作成、更新、およびエラーの最終カウントでジョブレコードを更新したいとします。
以下のJobProcessorは、このレコードを作成し、クライアントのOrganizationオブジェクトとJobオブジェクトをメッセージのヘッダーに設定します。 CensusExcelDataFormatはExcelファイルを読み取り、各行のEmployeeオブジェクトを作成してから、Collectionを返します。
from("file:" + censusDirectory + "?recursive=true").idempotentConsumer(new SimpleExpression("file:name"), idempotentRepository)
.process(new JobProcessor(organizationService, jobService, Job.JobType.CENSUS))
.unmarshal(censusExcelDataFormat)
.split(body(), new ListAggregationStrategy()).parallelProcessing()
.to(ExchangePattern.InOut, "hazelcast:seda:process-employee-import").end()
.process(new JobCompletionProcessor(jobService))
.end();
from("hazelcast:seda:process-employee-import")
.idempotentConsumer(simple("${body.entityId}"), idempotentRepository)
.bean(employeeImporterJob, "importOrUpdate");
問題は、リストの集計がすぐに行われ、ステータスのリストを取得するのではなく、同じEmployeeオブジェクトのリストを取得していることです。 EmployeeオブジェクトをSEDAキューに送信し、キューの処理からの戻り値を集計して、JobCompletionProcessorを実行してジョブレコードを更新します。
私は考えていませんそれはスプリッタと関係があります。これはローカルで実行するHazelcast SEDAキューを使用せずに実装しました。これは動作しています: –
'from(" file: "+ censusDirectory +"?recursive = true ")。idempotentConsumer(new SimpleExpression(" file:name ")、idempotentRepository) .process(新しいJobPreProcessor(organizationService、jobService、Job.JobType.CENSUS)) .unmarshal(censusExcelDataFormat) .split(本体()、新しいListAggregationStrategy()) .parallelProcessing() .bean(employeeImporterJob、「processItem ") .end() .bean(新規JobCompletion(jobService)、" completeJob ") .end();' –