私は、Spring Integration DSLフローを使用して、残りのAPIからデータを取得し、変換して別の残りのAPIに送信します。Spring Integration Queueエラー処理
データがフェッチされた後、残りの処理を行うキューチャネルにメッセージが送信されます。キューが動作している間に、元のスレッドが移動してより多くのデータを取得します。
問題は、キューからスローされたエラーはすべてのデータの処理が完了するまで処理されないが、処理を停止してすぐにエラーをスローする必要があるためです。長い時間が、私はそれが最初に見つかったエラーで停止したい。
ゲートウェイ:
@MessagingGateway(errorChannel = "syncErrorChannel")
@Service
public interface CrmGateway {
@Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput")
@Payload("new String()")
Object syncDepartments();
}
フロー:
/**
* Fetches data from the source api and passes it on to the split channel to process it If the
* response indicates it has more data to fetch then it is also loaded
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow sync() {
return IntegrationFlows
.from("departmentSyncInput")
.handle(this::fetchDomain)
.enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()")))
.routeToRecipients(r -> r
.recipient("departmentSplitChannel")
.recipient(
"departmentSyncInput",
p -> p.getPayload() instanceof Wrapper
&& ((Wrapper) p.getPayload()).getNext() != null
))
.get();
}
/**
* Split data from the api into individual models and send them to the target service
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow split() {
return IntegrationFlows
.from("departmentSplitChannel")
.transform(Wrapper.class, Wrapper::getContent)
.split()
.channel(c -> c.executor(Executors.newScheduledThreadPool(100)))
.enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel"))
.handle((payload, headers) -> log("Syncing", payload, payload))
.transform(Department.class, transformer)
// exception happens here
.handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload))
.handle((payload, headers) -> log("Synced", payload, payload))
.aggregate()
.get();
}
エラーハンドラ:
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows
.from("syncErrorChannel")
.handle(Exception.class, (payload, headers) -> {
payload.printStackTrace();
return payload;
})
.get();
}
私も同じ結果とIntegrationFlows.from("errorChannel")
を使用してみました。
私もFuture
を使用しようとしましたが、同じように動作するので、get()
と呼んだときにエラーが発生しますが、これは最後に起こっています。
ありがとうございました。
ありがとうございました。 'c - > c.executor()'をキューと違うものにしていますか?これはXMLから変換したDSLですが、以前はSI DSLを使用していませんでした。 –
大きな高さから、それを処理する空きスレッドがない場合は、内部キューに格納されているエグゼキュータのタスクをフィードするので、実際にはキューになります。ポーリング固有の動作のためにQueueChannelとは何を話していますか –