2016-03-27 15 views
6

私は、非同期実行を伴うバネ統合フローを持っています。これは、ゲートウェイからコントローラへの値を返し、値を返した後も統合フローを継続します。ここでスプリング統合フローにおけるエラー処理のプラクティス

は、ゲートウェイである:

@MessagingGateway 
public interface GW { 

    @Gateway(requestChannel = "f.input") 
    Task input(Collection<MessengerIncomingRequest> messages); 

} 

そして、ここでは、フローです:

@Bean 
IntegrationFlow jFlow() { 
     return IntegrationFlows.from(
     MessageChannels.executor("f.input", executor())) 
     .split() 
     .channel(MessageChannels.executor(executor())) 
     .transform(transformer) 
     .channel(routerChannel()) 
     .get(); 
} 

@Bean 
ThreadPoolTaskExecutor executor() { 
     ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); 
     ... 
     return pool; 
} 

@Bean 
MessageChannel routerChannel() { 
     return MessageChannels 
     .publishSubscribe("routerChannel", executor()) 
     .get(); 
} 

@Bean 
IntegrationFlow routerChannelFlow() { 
     return IntegrationFlows 
     .from(routerChannel()) 
     .publishSubscribeChannel(s -> s 
     .subscribe(f -> f.bridge(null)) 
     .subscribe(process())) 
     .get(); 
} 

@Bean 
IntegrationFlow process() { 
     return f -> 
     f.route(p -> p.getKind().name(), 
     m -> m.suffix("Channel") 
     .channelMapping(TaskKind.CREATE.name(), "create") 
     .... 
} 

@Bean 
IntegrationFlow createFlow() { 
     return IntegrationFlows.from(
     MessageChannels.direct("createChannel")) 
     .handle(routerService) 
     .get(); 
} 

は、どのように私は、全体の流れのためのエラーハンドラを定義することができますか?ベストプラクティスは何ですか?私は、ゲートウェイメソッド呼び出しのtry/catchブロックを置くことができますが、channel(routerChannel())の前に来るすべてのものについては、jFlowフローで発生する例外をキャッチします。

残りのフローでエラーを処理するにはどうすればよいですか?または全体の流れのために?

UPDATE

私はpublishSubscribeChannel

@Bean 
IntegrationFlow routerChannelFlow() { 
    return IntegrationFlows 
      .from(routerChannel()) 
      .publishSubscribeChannel(s -> s 
        .subscribe(f -> f.bridge(null)) 
        .subscribe(process()) 
        .errorHandler(errorHandler)) 
      .get(); 
} 

のエラーハンドラを追加しましたが、例外の場合には、私は次のエラーを取得するためには、助けていないようです:

cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: 

エラーハンドラが呼び出されません。

UPDATE

ゲイリーの回答によれば、私はこのコードを試みた:私はゲートウェイにexceptionChannelを添加し、そして第二の脚部(非同期)にヘッダを濃縮移動別の編集後

@Bean 
IntegrationFlow jFLow() { 
    return IntegrationFlows.from(
      MessageChannels.executor("f.input", executor())) 
      .split() 
      .channel(MessageChannels.executor(executor())) 
      .transform(transformer) 
      .channel(routerChannel()) 
      .get(); 
} 

@Bean 
IntegrationFlow exceptionOrErrorFlow() { 
    return IntegrationFlows.from(
      MessageChannels.direct("exceptionChannel")) 
      .handle(errorHandler, "handleError") 
      .get(); 
} 

    @Bean 
MessageChannel exceptionChannel() { 
    return MessageChannels.direct("exceptionChannel") 
      .get(); 
} 

@Bean 
IntegrationFlow process() { 
     return f -> 
     f.enrichHeaders((spec) -> 
        spec.header("errorChannel", "exceptionChannel", true)) 
     f.route(p -> p.getKind().name(), 
     m -> m.suffix("Channel") 
     .channelMapping(TaskKind.CREATE.name(), "create") 
     .... 
} 

@MessagingGateway(errorChannel = "exceptionChannel") 

を私の流れのフローの同期部分に例外がスローされた場合、コントローラはブロックされます。

答えて

4

まず、ゲートウェイの仕組みを説明しましょう。以下の解決策の理解に役立ちます。

要求メッセージは、replyChannelヘッダーとして追加される一意の一時応答チャネルを取得します。ゲートウェイが明示的なreplyChannelを持っていても、それは単に要求のreplyChannelにブリッジされます - それはゲートウェイが要求への応答をどのように相関させるかです。

ゲートウェイはまた、要求のerrorChannelヘッダーを同じ返信チャネルに設定します。こうすれば、たとえフローが非同期であっても、例外をゲートウェイに戻して、呼び出し側にスローしたり、ゲートウェイのエラーチャネル(指定されている場合)にルーティングすることができます。このルーティングはMessagePublishingErrorHandlerによって実行され、実行者をラップするErrorHandlingTaskExecutorに配線されています。

ゲートウェイに結果を返してから続行するので、そのゲートウェイの対話は「消費され」、replyChannelヘッダーに送信されるメッセージ(例外を含む)は何も受信されません。したがって、あなたが見ているログメッセージ。

したがって、1つの解決策は、独立フローに送信されたメッセージにerrorChannelヘッダーを修正することです。ゲートウェイによって設定されたerrorChannelヘッダーを置き換えるには、.enrichHeadersを使用してください(上書きをtrueに設定してください)。これはフロー内でできるだけ早く実行する必要があります。そのため、例外がそのチャネルにルーティングされます(エラーハンドラを登録することもできます)。

別の解決策は、MessagePublishingErrorHandlerに明示的にdefaultErrorChannelを設定し、errorChannelヘッダーを削除して、エラー処理の実行者を配線することです。

非同期エラールーティングでは、最初にヘッダーが検索されます。存在する場合は、エラーメッセージがそこにルーティングされます。ヘッダーがなく、MPEHにデフォルトのエラーチャネルがない場合は、メッセージはデフォルトのerrorChannelにルーティングされ、(通常は)LoggingChannelAdapterが登録されています。デフォルトのerrorChannelはpub/subチャネルですので、他のエンドポイントをサブスクライブすることができます。あなたは、パブリッシュ/サブスクライブする前にチャンネルを変えている

EDIT

ゲートウェイへの応答を少なくとも1つ取得することが重要です。 pub/subの片方の脚にエラーチャンネルだけを残して、もう片方の脚でそれを更新する必要があります。こうすることで、最初のレッグの例外が呼び出し側にスローされます(例外ハンドラへのルーティングなど、何らかのアクションを取る場合は、ゲートウェイにerrorChannelを追加できます)。 2番目のレッグのヘッダーを更新するだけで、例外がエラーハンドラに直接送られるようにする必要があります。

ゲートウェイのerrorChannelexceptionChannelに設定すると、両方の脚の例外がそこに移動します。

+0

ゲイリー、すばらしい説明をありがとう!私はいくつかの事を明確にしたいと思います * 1。 '.enrichHeadersを使用して、ゲートウェイによって設定されたerrorChannelヘッダーを置き換える(必ず上書きをtrueに設定してください)。* - どのチャンネル名を置き換えるべきですか? * 2。 '明示的にdefaultErrorChannelを設定する' * - このdefaultErrorChannelをどのBean/Classに設定すればよいかヒントを教えてください。どこにも見つけられない。 –

+0

私はあなたが何を意味しているのか理解したと思います。それに応じて私の答えを更新しました。問題はまだありますが、トンネルの終わりに光が見えると思います。 –

+0

私の編集を参照してください - あなたはあまりにも早くヘッダーを変更しています。 –

関連する問題