私は、非同期実行を伴うバネ統合フローを持っています。これは、ゲートウェイからコントローラへの値を返し、値を返した後も統合フローを継続します。ここでスプリング統合フローにおけるエラー処理のプラクティス
は、ゲートウェイである:
@MessagingGateway
public interface GW {
@Gateway(requestChannel = "f.input")
Task input(Collection<MessengerIncomingRequest> messages);
}
そして、ここでは、フローです:
@Bean
IntegrationFlow jFlow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
...
return pool;
}
@Bean
MessageChannel routerChannel() {
return MessageChannels
.publishSubscribe("routerChannel", executor())
.get();
}
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process()))
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@Bean
IntegrationFlow createFlow() {
return IntegrationFlows.from(
MessageChannels.direct("createChannel"))
.handle(routerService)
.get();
}
は、どのように私は、全体の流れのためのエラーハンドラを定義することができますか?ベストプラクティスは何ですか?私は、ゲートウェイメソッド呼び出しのtry/catchブロックを置くことができますが、channel(routerChannel())
の前に来るすべてのものについては、jFlow
フローで発生する例外をキャッチします。
残りのフローでエラーを処理するにはどうすればよいですか?または全体の流れのために?
UPDATE
私はpublishSubscribeChannel
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process())
.errorHandler(errorHandler))
.get();
}
のエラーハンドラを追加しましたが、例外の場合には、私は次のエラーを取得するためには、助けていないようです:
cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:
エラーハンドラが呼び出されません。
UPDATE
ゲイリーの回答によれば、私はこのコードを試みた:私はゲートウェイにexceptionChannel
を添加し、そして第二の脚部(非同期)にヘッダを濃縮移動別の編集後
@Bean
IntegrationFlow jFLow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
IntegrationFlow exceptionOrErrorFlow() {
return IntegrationFlows.from(
MessageChannels.direct("exceptionChannel"))
.handle(errorHandler, "handleError")
.get();
}
@Bean
MessageChannel exceptionChannel() {
return MessageChannels.direct("exceptionChannel")
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.enrichHeaders((spec) ->
spec.header("errorChannel", "exceptionChannel", true))
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@MessagingGateway(errorChannel = "exceptionChannel")
を私の流れのフローの同期部分に例外がスローされた場合、コントローラはブロックされます。
ゲイリー、すばらしい説明をありがとう!私はいくつかの事を明確にしたいと思います * 1。 '.enrichHeadersを使用して、ゲートウェイによって設定されたerrorChannelヘッダーを置き換える(必ず上書きをtrueに設定してください)。* - どのチャンネル名を置き換えるべきですか? * 2。 '明示的にdefaultErrorChannelを設定する' * - このdefaultErrorChannelをどのBean/Classに設定すればよいかヒントを教えてください。どこにも見つけられない。 –
私はあなたが何を意味しているのか理解したと思います。それに応じて私の答えを更新しました。問題はまだありますが、トンネルの終わりに光が見えると思います。 –
私の編集を参照してください - あなたはあまりにも早くヘッダーを変更しています。 –