2016-05-06 7 views
0

RabbitMQを通じてSpringブートとRPCで動作するチュートリアルがいくつかあります。しかし、ジャクソンのJSONメッセージコンバータを追加しようとするとすぐに、それはすべて落ちてしまいます。SpringブートとSpring AMQP RPC - 例外を変換するコンバータが見つかりません

リモート呼び出しがサーバーによって正常に受信されたため、クライアントの構成ではないと確信しています。

Exchange DATAFLOW_EXCHANGE 
Routing Key  dataflowRunner 
Redelivered  ○ 
Properties 
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw== 
priority: 0 
delivery_mode: 2 
headers:  
__TypeId__: org.springframework.remoting.support.RemoteInvocation 
content_encoding: UTF-8 
content_type: application/json 
Payload 
675 bytes 
Encoding: string 


{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]} 

ただし、以下の例外が出力されています、代引き、それはそれだけでコンバータを見つけることができない、それはdw.dataflow.Dataflowオブジェクトでなければなりませんので

Caused by: org.springframework.messaging.converter.MessageConversionException: 
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage 
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}] 
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118) 
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98) 
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) 
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) 
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) 
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112) 
... 12 common frames omitted 

知っています。しかし、私はどこにでも私のコンバータを定義しています。ここで

サーバー構成

@Configuration 
@EnableRabbit 
public class RabbitListenerConfiguration { 
    @Autowired 
    ConnectionFactory connectionFactory; 
    @Autowired 
    ObjectMapper  jacksonObjectMapper; 

@Bean 
public TopicExchange exchange() { 
    return new TopicExchange("DATAFLOW_EXCHANGE", true, false); 
} 

@Bean 
public Queue queue() { 
    return new Queue("DATAFLOW_QUEUE", true); 
} 

@Bean 
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() { 
    AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ; 
    exporter.setAmqpTemplate(rabbitTemplate()); 
    exporter.setMessageConverter(jackson2JsonMessageConverter()); 
    exporter.setServiceInterface(DataflowRunner.class); 
    exporter.setService(dataflowRunner()); 
    return exporter ; 
} 

@Bean 
public DataflowRunner dataflowRunner() { 
    return new DataflowRunnerServerImpl(); 
} 

@Bean 
public MessageConverter jackson2JsonMessageConverter() { 
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    converter.setJsonObjectMapper(jacksonObjectMapper); 
    return converter; 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory); 
    template.setMessageConverter(jackson2JsonMessageConverter()); 
    return template; 
} 


@Bean(name="rabbitListenerContainerFactory") 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setMessageConverter(jackson2JsonMessageConverter()); 
    factory.setDefaultRequeueRejected(false); 
    return factory; 
} 

は、サービス・インターフェースである:

public interface DataflowRunner { 
    String run(Dataflow dataflow) throws Exception; 
} 

そして、具体的な実装:にやにや笑いと笑いのために

public class DataflowRunnerServerImpl implements DataflowRunner { 
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE") 
public String run(Dataflow dataflow) throws Exception { 
    // SNIP 
} 

、私はまた、サーバーの実装を設定しようとしました次のアノテーションを持つクラスですが、同じエラーが発生します:

@RabbitHandler 
@RabbitListener(
     bindings = @QueueBinding(key = "dataflowRunner", 
       value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"), 
       exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic"))) 
public String run(Dataflow dataflow) throws Exception { 

クライアント構成

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort); 
    connectionFactory.setUsername(rabbitUser); 
    connectionFactory.setPassword(rabbitPassword); 
    connectionFactory.setAddresses(rabbitAddresses); 
    return connectionFactory; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(jackson2MessageConverter()); 
    return template; 
} 

は何も間違って設定思えますか?私は何が欠けていますか?私は、コンバーターをサービス輸出業者とリスナーコンテナ工場に設定しました。

ご意見やご感想をお寄せください。

+0

、これらのチュートリアルを共有してください。 'impl'を提供する必要はありません。 'AmqpProxyFactoryBean'はあなたのためのものです。あなたはちょっと混じった懸念があります:http://docs.spring.io/spring-amqp/reference/html/_reference.html#remoting –

答えて

4

@RabbitListenerは、サービスエクスポータでは使用できません。これは単純なJavaクラスです。

RPCによるSpring Remotingの場合、サービスエクスポータはSimpleMessageListenerContainerの場合はMessageListenerです。

@RabbitListenerには、pojoメソッドをラップする特別なリスナーアダプタがあります。

あなたは2つの異なるパラダイムを混ぜているようです。

ServiceExporter(Spring Remoting)は、クライアントサイドのAmqpProxyFactoryBeanとペアになり、サーバー側のリスナーとしてサービスエクスポータが関連付けられていることが予想されます。

単純なPOJO RPC(RabbitMQよりもSpring Remotingを使用するよりも新しい)では、クライアント側で@RabbitListenerRabbitTemplate.convertSendAndReceive()を使用します。 PFBとSEを取り除く。

ドキュメントに説明を追加する必要がある場合に備えて、このパスを導いた理由を説明できますか。

EDIT

あなたは春Remotingを使用したいです場合(クライアント側のインターフェイスを注入し、それが「魔法」サーバー側でサービスを起動する必要があり)、あなたは取り除く必要がありますすべてのコンテナ工場のものとSimpleMessageListenerContainerを結び、MessageListenerとしてサービス輸出業者を注入してください。

リファレンスマニュアルはan XML exampleですが、@BeanとしてSMLCを配線することができます。

EDIT2

トップレベルのオブジェクトがRemoteInvocationあるので、私はJSONでは動作しませんAMQPを超えるいくつかのテストと春Remotingを実行している - メッセージコンバータは、そのオブジェクトを再作成することができますが、それはありませんがあります実際の引数に関する情報を入力すると、リンクされたハッシュマップとして残ります。

今のところ、JSONを使用する必要がある場合、@RabbitListenerと組み合わせてconvertSendAndReceiveというテンプレートがここに移動する方法です。 JSONでSpring Remoting RPCを使用して対処できるかどうかを確認するためにJIRAの問題を開きますが、実際にはJava Serialization用に設計されています。

+0

私が探していたものです。私はこの同じ問題を抱えていましたが、Spring AMQPとSpring Remotingとコードを読んでいる時間がありました。JSONは実際の値を変換せず、ラッパーオブジェクトだけを変換するので、JSONはサポートされません。これは残念ですが、私はこの機能を使用してJSONを生成する既存のサービスを公開したいと考えていました。私の現在のDTOはシリアライズ可能ではないので、通常のコンバーターだけを使用することはできません。 –

0

私はこれに数分を費やしましたが、私はうまくいかない恐ろしいハックで問題を解決することができます。

私は基本的に両方の側で呼び出しに含まれるクラスを拡張して、内部の引数と値がJSON文字列に/から変換されるようにしました。

もう少し愛があれば、これは他のコンバータを使って他のデータ型で動作するように改善することができますが、そのための時間はありませんでした。

まず、サーバー側でそれを

:-)試してみる勇気ある場合、私はあなたにそれを残して、私はJSONオブジェクトへ/からの変換のサポートを追加できるようにAmqpInvokerServiceExporterをサブクラス化。最初のステップは、JSONのメソッド引数を対応する型に変換することです。 2番目のステップは、オブジェクトから返された値を対応するJSON文字列に変換して返します。このクラスが定義されて

public class JSONAmqpInvokerServiceExporter extends AmqpInvokerServiceExporter { 

    private final ObjectMapper objectMapper = new ObjectMapper(); 

    @Override 
    public void onMessage(Message message) { 
     Address replyToAddress = message.getMessageProperties().getReplyToAddress(); 
     if (replyToAddress == null) { 
      throw new AmqpRejectAndDontRequeueException("No replyToAddress in inbound AMQP Message"); 
     } 

     Object invocationRaw = getMessageConverter().fromMessage(message); 

     RemoteInvocationResult remoteInvocationResult; 
     if (invocationRaw == null || !(invocationRaw instanceof RemoteInvocation)) { 
      remoteInvocationResult = new RemoteInvocationResult(
       new IllegalArgumentException("The message does not contain a RemoteInvocation payload")); 
     } 
     else { 
      RemoteInvocation invocation = (RemoteInvocation) invocationRaw; 
      int argCount = invocation.getArguments().length; 
      if (argCount > 0) { 
       Object[] arguments = invocation.getArguments(); 
       Class<?>[] parameterTypes = invocation.getParameterTypes(); 
       for (int i = 0; i < argCount; i++) { 
        try { 
         //convert arguments from JSON strings to objects 
         arguments[i] = objectMapper.readValue(arguments[i].toString(), parameterTypes[i]); 
        } 
        catch (IOException cause) { 
         throw new MessageConversionException(
          "Failed to convert JSON to value: " + arguments[i] + " of type" + parameterTypes[i], cause); 
        } 
       } 
      } 

      remoteInvocationResult = invokeAndCreateResult(invocation, getService()); 
     } 
     send(remoteInvocationResult, replyToAddress); 
    } 

    private void send(RemoteInvocationResult result, Address replyToAddress) { 
     Object value = result.getValue(); 
     if (value != null) { 
      try { 
       //convert the returning value from a model to a JSON string 
       //before we send it back 
       Object json = objectMapper.writeValueAsString(value); 
       result.setValue(json); 
      } 
      catch (JsonProcessingException cause) { 
       throw new MessageConversionException("Failed to convert value to JSON: " + value, cause); 
      } 
     } 
     Message message = getMessageConverter().toMessage(result, new MessageProperties()); 

     getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), message); 
    } 

} 

今、私はこのような何かに私のサービスリスナーの定義を変更:私はResultInvocationValueは常になります知っているので、この場合には、通常のAmqTemplateを使用

<bean id="toteServiceListener" class="amqphack.FFDAmqpInvokerServiceExporter"> 
    <property name="serviceInterface" value="ampqphack.ToteService"/> 
    <property name="service" ref="defaultToteService"/> 
    <property name="amqpTemplate" ref="rabbitTemplate"/> 
</bean> 

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="toteServiceListener" queue-names="tote-service"/> 
</rabbit:listener-container> 

とにかくJSON文字列に変換されるので、InvocationResultが従来のJavaシリアル化を使用してシリアル化されているかどうかは気にしません。私は物事に変更しなければならなかったクライアントはクライアント側

。まず、呼び出しの際に渡す引数はすべてJSON文字列に変換する必要がありますが、引き続きパラメータ型を保持しています。幸運なことに、既存のAmqpProxyFactoryBeanremoteInvocationFactoryパラメータを受け取り、そこで呼び出しを代行して変更することができます。だから私は、最初に定義され、新しいRemoteInvocationFactory

public class JSONRemoteInvocationFactory implements RemoteInvocationFactory { 

    private final ObjectMapper mapper = new ObjectMapper(); 

    @Override 
    public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { 
     RemoteInvocation invocation = new RemoteInvocation(methodInvocation); 
     if (invocation.getParameterTypes() != null) { 
      int paramCount = invocation.getParameterTypes().length; 
      Object[] arguments = new Object[paramCount]; 
      try { 
       for (int i = 0; i < paramCount; i++) { 
        arguments[i] = mapper.writeValueAsString(invocation.getArguments()[i]); 
       } 
       invocation.setArguments(arguments); 
      } 
      catch (JsonProcessingException cause) { 
       throw new RuntimeException(
        "Failed converting arguments to json: " + Arrays.toString(invocation.getArguments()), cause); 
      } 
     } 
     return invocation; 
    } 
} 

しかし、それは十分ではありません。結果が返ってくると、その結果をJavaオブジェクトに戻す必要があります。このために、サービスインタフェースの期待リターンタイプを使用することができます。そしてこれに対して、私は存在をAmqpProxyFactoryBeanに拡張し、その結果を、私が常にStringであると知っているものをJavaモデルに変換するだけです。

public class JSONAmqpProxyFactoryBean extends AmqpProxyFactoryBean { 

    private final ObjectMapper mapper = DefaultObjectMapper.createDefaultObjectMapper(); 

    @Override 
    public Object invoke(MethodInvocation invocation) throws Throwable { 
     Object ret = super.invoke(invocation); 
     return mapper.readValue(ret.toString(), invocation.getMethod().getReturnType()); 
    } 

} 

そして、これで、私は多少このように私のクライアント側を定義することができました:

<bean id="toteService" class="amqphack.JSONAmqpProxyFactoryBean"> 
    <property name="amqpTemplate" ref="rabbitTemplate"/> 
    <property name="serviceInterface" value="amqphack.ToteService"/> 
    <property name="routingKey" value="tote-service"/> 
    <property name="remoteInvocationFactory" ref="remoteInvocationFactory"/> 
</bean> 

そして、この後すべてが魅力のように働いた:

ToteService toteService = context.getBean("toteService", ToteService.class); 
ToteModel tote = toteService.findTote("18251", "ABCD"); 

私は伝統的なコンバータを変更しないので、例外がまだ正しくシリアル化されていることを意味します。InvocationResult

0

これはまだ必要かどうかわかりませんが、これはAmqpProxyFactoryBean/AmqpInvokerServiceExporterでJSONを使用するための問題を解決した方法です。クライアント側ではJackson2JsonMessageConverterコンバータを使用し、サーバ側ではRemoteInvocationAwareMessageConverterAdapterJackson2JsonMessageConverterコンバータをラップします。

ClientConfig.java

import com.stayfriends.commons.services.interfaces.GameService; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.beans.factory.FactoryBean; 
import org.springframework.beans.factory.InitializingBean; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class ClientConfig { 

    @Bean 
    public RabbitTemplate gameServiceTemplate(ConnectionFactory connectionFactory, 
               Jackson2JsonMessageConverter messageConverter) { 
     RabbitTemplate template = new RabbitTemplate(connectionFactory); 
     template.setExchange("rpc"); 
     template.setMessageConverter(messageConverter); 
     return template; 
    } 

    @Bean 
    public ServiceAmqpProxyFactoryBean gameServiceProxy2(@Qualifier("gameServiceTemplate") RabbitTemplate template) { 
     return new ServiceAmqpProxyFactoryBean(template); 
    } 


    public static class ServiceAmqpProxyFactoryBean implements FactoryBean<Service>, InitializingBean { 
     private final AmqpProxyFactoryBean proxy; 

     ServiceAmqpProxyFactoryBean(RabbitTemplate template) { 
      proxy = new AmqpProxyFactoryBean(); 
      proxy.setAmqpTemplate(template); 
      proxy.setServiceInterface(GameService.class); 
      proxy.setRoutingKey(GameService.class.getSimpleName()); 
     } 

     @Override 
     public void afterPropertiesSet() { 
      proxy.afterPropertiesSet(); 
     } 

     @Override 
     public Service getObject() throws Exception { 
      return (Service) proxy.getObject(); 
     } 

     @Override 
     public Class<?> getObjectType() { 
      return Service.class; 
     } 

     @Override 
     public boolean isSingleton() { 
      return proxy.isSingleton(); 
     } 
    } 

} 

ServerConfig.java

import org.springframework.amqp.core.*; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; 
import org.springframework.amqp.rabbit.listener.MessageListenerContainer; 
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter; 
import org.springframework.amqp.support.converter.RemoteInvocationAwareMessageConverterAdapter; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class ServerConfig { 

    @Bean 
    public DirectExchange serviceExchange() { 
     return new DirectExchange("rpc"); 
    } 

    @Bean 
    public Queue serviceQueue() { 
     return new Queue(Service.class.getSimpleName()); 
    } 

    @Bean 
    public Binding binding(@Qualifier("serviceQueue") Queue queue, @Qualifier("serviceExchange") Exchange exchange) { 
     return BindingBuilder.bind(queue).to(exchange).with(Service.class.getSimpleName()).noargs(); 
    } 

    @Bean("remoteInvocationAwareMessageConverter") 
    @Primary 
    public RemoteInvocationAwareMessageConverterAdapter remoteInvocationAwareMessageConverterAdapter(
     Jackson2JsonMessageConverter jsonMessageConverter) { 
     return new RemoteInvocationAwareMessageConverterAdapter(jsonMessageConverter); 
    } 

    @Bean 
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, ServiceImpl service, 
               RemoteInvocationAwareMessageConverterAdapter messageConverter) { 
     AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter(); 
     exporter.setAmqpTemplate(template); 
     exporter.setService(service); 
     exporter.setServiceInterface(Service.class); 
     exporter.setMessageConverter(messageConverter); 
     return exporter; 
    } 

    @Bean 
    public MessageListenerContainer container(ConnectionFactory connectionFactory, 
               @Qualifier("serviceQueue") Queue queue, 
               AmqpInvokerServiceExporter exporter) { 
     DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory); 
     container.setQueues(queue); 
     container.setMessageListener(exporter); 
     container.setConsumersPerQueue(5); 
     return container; 
    } 
} 
関連する問題