2016-06-22 15 views
1

コールバックメッセージを持つ非同期RabbitMQメッセージに苦労しています。ここでRabbitTemplateコールバック付き非同期メッセージ

は私のプロデューサーの一部です:

@Autowired(required = false) 
@Qualifier("rabbitTemplate") 
private RabbitTemplate queueTemplate; 

@Override 
public Response createIncident(String incident) { 
    LOGGER.info("Sending incident into queue"); 
    queueTemplate.convertAndSend((Object)incident, new MessagePostProcessor() { 

     @Override 
     public Message postProcessMessage(Message message) throws AmqpException { 
      message.getMessageProperties().setReplyTo("replyQueue"); 
      return message; 
     } 
    }); 
    Message message = queueTemplate.receive(); 
... 

春のRabbitMQコンテキストこれは、内部取得する必要はありません

@Autowired(required = false) 
@Qualifier("rabbitTemplate") 
private RabbitTemplate queueTemplate; 

public void createIncident() { 
    queueTemplate.receiveAndReply("replyQueue", new ReceiveAndReplyMessageCallback() { 

     @Override 
     public Message handle(Message message) { 
      String incident = new String(message.getBody()); 
      LOGGER.debug("Queue incoming message: " + incident); 
      String result = "" + messageHandler.createIncident(incident); 
      return new Message(result.getBytes(), new MessageProperties()); 
     } 

    }); 

を受信して​​対応すべき

<rabbit:admin connection-factory="rabbitConnFactory" /> 

<rabbit:queue name="rabbitQueue" /> 
<rabbit:queue name="replyQueue" /> 

<rabbit:topic-exchange name="rabbitExchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="rabbitQueue" pattern="camel" /> 
     <rabbit:binding queue="replyQueue" pattern="camel" /> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 


<rabbit:template id="rabbitTemplate" 
    connection-factory="rabbitConnFactory" exchange="rabbitExchange" routing-key="camel" 
    reply-address="replyQueue" reply-queue="replyQueue"> 
    <rabbit:reply-listener/> 
</rabbit:template> 

<rabbit:listener-container 
    connection-factory="rabbitConnFactory" concurrency="10"> 
    <rabbit:listener ref="rabbitMessageListener" method="createIncident" 
     queue-names="rabbitQueue" /> 
</rabbit:listener-container> 

<bean id="rabbitMessageListener" class="com.my.listener.QueueListener" /> 

とリスナーこのリスナーのメッセージを処理します。そして、アプリケーションコンテナの表示メッセージ

2016年6月22日14:15:30457 ERROR [AbstractFaultChainInitiatorObserver:115]エラーはあきらめ、エラー 処理中に発生しました! org.apache.cxf.interceptor.Fault:No 'queue' が指定されています。 RabbitTemplateの設定を確認してください。 でorg.apache.cxf.service.invoker.AbstractInvoker.createFault(AbstractInvoker.java:170) でorg.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:136) org.apacheで org.apache.cxf.interceptor.ServiceInvokerInterceptor $ 1.runで.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:204)org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:101)で org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:272)で org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:94)で(ServiceInvokerInterceptor.java:58) でorg.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) でorg.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:249) org.apache.cxfで org.apache.cxf.transport.servletで.transport.servlet.ServletController.invokeDestination org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:222)で(ServletController.java:248) org.apache.cxf.transpoで.ServletController.invoke org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:171)で(ServletController.java:153) javax.servlet.http.HttpServlet.serviceでrt.servlet.AbstractHTTPServlet.handleRequest org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:209)で(AbstractHTTPServlet.java:289) (HttpServletを.java:650) org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:265) org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303) ATで で org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) でorg.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) org.apache.cでatalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain。Javaの:208) でcom.mobilitymedia.connected.http.core.LoggerFilter.doFilter(LoggerFilter.java:41) org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) での org.springframework.securityで org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:330) で org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) .web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:118) at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(Fil terSecurityInterceptor.java:84) でorg.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) でorg.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java: org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) で113) でorg.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:103) で org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.secu org.springframework.security.webで org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) でrity.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:113) 。 org.springframework.security.web.savedrequest.RequestCacheAwareFilterで org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) でservletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:154) 。 doFilter(RequestCacheAwareFilter.java:45) at org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainPro xy.java:342) でorg.springframework.security.web.authentication.preauth.Abs​​tractPreAuthenticatedProcessingFilter.doFilter(AbstractPreAuthenticatedProcessingFilter.java:94) でorg.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy。 Javaの:342) でorg.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:50) org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.javaで: 107) at org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframeworkで org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter(FilterChainProxy.java:342) でorg.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:87) org.springframework.web.filter.DelegatingFilterProxy.invokeDelegateで.security.web.FilterChainProxy.doFilterInternal org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160)で(FilterChainProxy.java:192) (DelegatingFilterProxy.java:344) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy。Javaの:261) org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) で org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) で で組織 org.apache.catalina.authenticator.AuthenticatorBaseで.apache.catalina.core.StandardWrapperValve.invoke org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)で(StandardWrapperValve.java:220) .invoke(AuthenticatorBase.java:505) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103) でorg.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956) org.apache.catalina.coreで org.apache.coyote.http11.AbstractHttp11Processor.processで.StandardEngineValve.invoke org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)で(StandardEngineValve.java:116) (AbstractHttp11Processor.java :1078) でorg.apache.coyote.AbstractProtocol $ AbstractConnectionHandler.process(AbstractProtocol.java:625) でorg.apache.tomcat.util.net.JIoEndpoint $ SocketProcessor.run(JIoEndpoint.java:316) でjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) で org.apache.tomcat.utilで.threads.TaskThread $ WrappingRunnable.run(TaskThread.java:61) java.lang.Thread.run(Thread.java:745)で発生します。 org.springframework.amqp.AmqpIllegalStateException:いいえ 'キュー' が指定されています。 RabbitTemplateの設定を確認してください。 でorg.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:1514) でorg.springframework.amqp.rabbit.core.RabbitTemplate.receive(RabbitTemplate.java:802) コムで。 myService.IncidentServiceDefaultImpl.createIncident sun.reflect.DelegatingMethodAccessorImplでsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)で(IncidentServiceDefaultImpl.java:36)sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)で 。 (Involve)を使用するとInvoker.java:188) より ... 65あなたが使用する必要があります

+0

完全スタックトレースを送信します。 –

答えて

0

org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:104) で:

/** 
* The name of the default queue to receive messages from when none is specified explicitly. 
* 
* @param queue the default queue name to use for receive 
*/ 
public void setQueue(String queue) { 
    this.queue = queue; 
} 
reply-addressの代わりに

私はリスナーがrouting-key="camel"にバインドされたいくつかの異なるキューに見てみる必要があるため

そして、あなたの

queueTemplate.receiveAndReply("replyQueue", 

を再考らしいです。

まだ<binding>の混乱があります。

receiveAndReplyの代わりに消費者の部分に実際にListenerContainerを使用できるかどうかを確認することをお勧めします。

関連する問題