2017-02-06 8 views
0

私は、Spring統合dslを使用して、次のSpring統合プロジェクトをjava configバージョンに変換しようとしています。私は多くの運を持っていないし、私はdslに関するドキュメントを見つけることができません。Spring IntegrationFlow amqpキューへのhttpリクエスト

ここは私が変換しているプロジェクトです。それはxmlの設定を使用します。 https://github.com/dsyer/http-amqp-tunnel

基本的にhttp要求を受け取り、もう一方のターゲットアプリケーションにrabbitmq経由でトンネルします。プロジェクトが何をすべきかについての良い説明は、上のリンクで見つけることができます。

私のアプリと上記のgithubの主な違いは、私が春のブート1.5.1.RELEASEと元の1.1.4.BUILD-SNAPSHOTに基づいていることです。また、元のプロジェクトでは、int-http:inbound-gateway、int-http:outbound-gateway、int-amqp:outbound-gateway、int-amqp:inbound-gatewayを使用していますが、 IntegrationFlow dslをJava設定で使用します。

私のコードでは、RabbitMQにメッセージが表示されないので、ブラウザでタイムアウト例外が発生するため、IntegrationFlowの設定が正しくないと思います。私はリクエストをログに記録するワイヤタップを追加しました。ブラウザからアプリを起動すると、ワンタッチの出力しか表示されません。

正しい方向のちょっとしたナッジが大変ありがとうございます。

更新設定とエラー

package org.springframework.platform.proxy; 

import org.springframework.amqp.core.*; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.beans.factory.annotation.*; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; 
import org.springframework.context.annotation.*; 
import org.springframework.integration.config.EnableIntegration; 
import org.springframework.integration.dsl.*; 
import org.springframework.integration.dsl.amqp.Amqp; 
import org.springframework.integration.dsl.http.Http; 
import org.springframework.integration.handler.LoggingHandler; 
import org.springframework.messaging.MessageHandler; 
import org.springframework.web.client.RestTemplate; 

@Configuration 
@ComponentScan 
@EnableAutoConfiguration 
@EnableIntegration 
public class TunnelApplication 
{ 
    public static void main(String[] args) 
    { 
     SpringApplication.run(TunnelApplication.class, args); 
    } 

    @Value("${urlExpression}") 
    private String urlExpression; 

    @Value("${targetUrl}") 
    private String targetUrl; 

    @Value("${outboundQueue}") 
    private String outboundQueue; 

    @Value("${inboundQueue}") 
    private String inboundQueue; 

    @Autowired 
    private ConnectionFactory rabbitConnectionFactory; 

    @Bean 
    public Queue requestQueue() 
    { 
     return new Queue(outboundQueue, true, false, true); 
    } 

    @Bean 
    public Queue targetQueue() 
    { 
     return new Queue(inboundQueue, true, false, true); 
    } 

    @Bean 
    public RestTemplate safeRestTemplate() 
    { 
     return new RestTemplate(); 
    } 

    @Bean 
    public Jackson2JsonMessageConverter jsonMessageConverter() 
    { 
     return new Jackson2JsonMessageConverter(); 
    } 


    @Bean 
    public AmqpTemplate amqpTemplate() 
    { 
     RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory); 
     result.setMessageConverter(jsonMessageConverter()); 
     return result; 
    } 

    @Bean 
    public IntegrationFlow httpInboundGateway() 
    { 
     return IntegrationFlows 
       .from(Http.inboundGateway("/tunnel")) 
       .handle(
         Amqp.outboundAdapter(amqpTemplate()) 
          .mappedRequestHeaders("http_*") 
          .routingKey(outboundQueue) 
//       .routingKeyExpression("headers['routingKey']") 
         ) 
       .wireTap(f->f.handle(logger("outbound"))) 
       .get(); 
    } 

    @Bean 
    public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) 
    { 
     return IntegrationFlows.from 
       (
        Amqp.inboundGateway(connectionFactory, inboundQueue) 
         .mappedRequestHeaders("http_*") 
         .messageConverter(jsonMessageConverter()) 
       ) 
       .handle(Http.outboundGateway(targetUrl)) 
       .wireTap(f->f.handle(logger("inbound"))) 
       .get(); 
    } 


    @Bean 
    public MessageHandler logger(String name) 
    { 
     LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.INFO.name()); 
     loggingHandler.setLoggerName(name); 
     return loggingHandler; 
    } 
} 

次のエラーメッセージが印刷されますし続け、アプリが実行されている間のRabbitMQの上にとどまるのメッセージがあります。それはそれを引っ張って、エラーを得て、それを元に戻すように見えます。それは私の心配です。なぜなら、エラーが発生して元のクライアントに伝播し、サーバーを動かさないようにしたいからです。

2017-02-06 16:00:12.167 INFO 10264 --- [nio-9000-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]  : Initializing Spring FrameworkServlet 'dispatcherServlet' 
2017-02-06 16:00:12.167 INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet  : FrameworkServlet 'dispatcherServlet': initialization started 
2017-02-06 16:00:12.190 INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet  : FrameworkServlet 'dispatcherServlet': initialization completed in 23 ms 
2017-02-06 16:00:16.806 INFO 10264 --- [erContainer#0-1] outbound         : <200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}> 
2017-02-06 16:00:16.810 WARN 10264 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. 

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na] 
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_66] 
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test:9000.amqpInboundGateway.channel#1'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=<200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}>, headers={http_requestMethod=GET, replyChannel=org.springframewor[email protected]2eb9b1c6, errorChannel=org.springframewor[email protected]2eb9b1c6, amqp_consumerQueue=request, http_requestUrl=http://localhost:9000/tunnel/, id=bcb94ed9-45fc-c333-afee-de6e20a9f1b5, Content-Length=14, amqp_consumerTag=amq.ctag-ncEDSKdgWNKQk-jhGfqsbw, contentType=text/html;charset=UTF-8, http_statusCode=200, Date=1486418416000, timestamp=1486418416805}] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$400(AmqpInboundGateway.java:52) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$1.onMessage(AmqpInboundGateway.java:154) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na] 
    ... 10 common frames omitted 
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE] 
    ... 35 common frames omitted 

コンフィグ春ブーツアクチュエータの/豆エンドポイントにヒットするように変更ゲイリーさんのコメント に基づきます。

答えて

0
@SpringBootApplication 
public class So42077149Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So42077149Application.class, args); 
    } 

    @Bean 
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) { 
     return IntegrationFlows.from(Http.inboundGateway("/foo")) 
       .log() 
       .handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName())) 
       .log() 
       .bridge(null) 
       .get(); 
    } 

    @Bean 
    public Queue queue() { 
     return new AnonymousQueue(); 
    } 

    @Bean 
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue())) 
       .log() 
       .handle(Http.outboundGateway("http://localhost:8080/bar") 
         .expectedResponseType(String.class)) 
       .log() 
       .bridge(null) 
       .get(); 
    } 

    @Bean 
    public IntegrationFlow finalWeb() { 
     return IntegrationFlows.from(Http.inboundGateway("/bar")) 
       .log() 
       .<String, String>transform(String::toUpperCase) 
       .log() 
       .bridge(null) 
       .get(); 
    } 


} 

結果:

$ curl -H "Content-Type: text/plain" -d foo localhost:8080/foo 
FOO 

EDIT

とJSONと ...

otApplication パブリッククラスSo42077149Application {

public static void main(String[] args) { 
     SpringApplication.run(So42077149Application.class, args); 
    } 

    @Bean 
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) { 
     return IntegrationFlows.from(Http.inboundGateway("/foo")) 
       .log() 
       .handle(Amqp.outboundGateway(amqpTemplate) 
         .routingKey(queue().getName()) 
         .mappedRequestHeaders("*") 
         .mappedReplyHeaders("*")) 
       .log() 
       .bridge(null) 
       .get(); 
    } 

    @Bean 
    public Queue queue() { 
     return new AnonymousQueue(); 
    } 

    @Bean 
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue())) 
       .log() 
       .handle(Http.outboundGateway("http://localhost:8080/bar") 
         .mappedRequestHeaders("*") 
         .mappedResponseHeaders("*") 
         .httpMethod(HttpMethod.GET) 
         .expectedResponseType(Map.class)) 
       .log() 
       .log(Level.INFO, "payloadClass", "payload.getClass()") 
       .bridge(null) 
       .get(); 
    } 

    @Bean 
    public IntegrationFlow finalWeb() { 
     return IntegrationFlows.from(Http.inboundGateway("/bar")) 
       .log() 
       .transform("{ \"foo\" : \"bar\" }") 
       .enrichHeaders(h -> h.header("contentType", "application/json")) 
       .log() 
       .bridge(null) 
       .get(); 
    } 

} 
+0

ゲーリー私はあなたの設定を試しました。 RestTemplateのjsonメッセージにメッセージを変換するコンバータが必要なようです。どのコンポーネントをプラグインするべきかについてのガイダンスはありますか? org.springframework.web.client.RestClientException:要求を書き込めませんでした:要求タイプ[org.springframework.util.LinkedMultiValueMap]とコンテンツタイプ[application/x-java-serialized-object]に対して適切なHttpMessageConverterが見つかりませんでした。 –

+0

一方、私は文字通りブラウザからGETをやっているので、ブラウザからの休憩要求ではありません。 –

+0

あなたのコメントにも基づいて私の更新された設定を入れました。実際にアクチュエータのエンドポイントに当たるようにURLを修正しました。 –

1

Amqp.outboundGatewayを使用する必要があります。それは、Daveはそのサンプルで持っているものです。

<int-amqp:outbound-gateway request-channel="outbound" 
     routing-key="${outboundQueue}" mapped-request-headers="http_*" /> 

プラス、見て、あなたはここでDaveのロジックによると、outboundQueueでなければならないroutingKeyを見逃しています。

Http.inboundGateway

Amqp.outboundGatewayIntegrationFlowに組み合わせることが可能である。

@Bean 
public IntegrationFlow clientGateway() { 
    return IntegrationFlows 
      .from(Http.inboundGateway("/tunnel")) 
      .handle(Amqp.outboundGateway(amqpTemplate) 
         .mappedRequestHeaders("http_*") 
         .routingKey(outboundQueue)) 
      .get(); 
} 

サーバ部分は同様単一IntegrationFlowに組み合わせることができます。 そしてそのコンポーネントは私のためによく見えます。

あなたは本当にあなたの休憩サービスからの返信を期待しています。したがって、すべての下流のコンポーネントは要求/応答でなければなりません。

もう一度デザインを見てみましょう。

------------- HTTP ------------- AMQP ------------- AMQP ------------- HTTP -------------- 
| local app | <------> | client | <------> | broker | <------> | server | <------> | target app | 
-------------   -------------   -------------   -------------   -------------- 
+0

私の答えも参照してください。 –

+0

情報ありがとうございます。私はそれ以上になると思う。あなたのアドバイスに基づいて私の新しい設定を含めるようにオリジナルの質問を更新しました。私は今、ログに繰り返されるエラーを取得しています。 –

+0

「Amqp.outboundAdapter'が表示されますが、Amqp.outboundGatewayは表示されません。暗黙的なチャンネルのない 'wireTap'の場合は、Garyが示唆するように' .bridge(null) 'を使用する必要があります。あなたが 'wireTap'があなたがそれを指摘する場所でチャネルに適用され、フロー全体ではなく適用されることに注意してください。 –

関連する問題