2017-03-23 11 views
0

私はRESTエンドポイントを持つとRabbitMQのブローカーにメッセージを挿入し、春ブーツ内のゲートウェイを実装しようとしているように構成されていません。私は、エラーを処理する必要があるので、私は「リスナー」としてそれをマークし、REPLYQUEUEを消費することができるように私RabbitTemplateでDLQ、およびSimpleMessageListenerContainerとreplyAddressを構成しました。春AMQPは動的RabbitTemplateとSimpleMessageListenerContainerを作成し、エラーRabbitTemplateはMessageListenerの

それは、「ハードコード化された」豆と正常に動作します:

@Bean 
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory); 
    template.setReceiveTimeout(0); 
    template.setReplyTimeout(10000); 
    template.setExchange("inputExchange"); 
    template.setRoutingKey("routing.1"); 
    template.setReplyAddress("replyQueue1"); 

    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); 
    DefaultClassMapper classMapper = new DefaultClassMapper(); 
    classMapper.setDefaultType(Event.class); 
    messageConverter.setClassMapper(classMapper); 
    template.setMessageConverter(messageConverter); 

    return template; 
} 

@Bean 
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("replyQueue1"); 
    container.setMessageListener(rabbitTemplate(connectionFactory)); 
    return container; 
} 

しかし、このゲートウェイの目標は、完全に設定可能ですので、ウサギ交換/キューへのすべてのルートをコーディングしません。例えば、私はYAMLにこの設定を

を持っている:

routes: 
    service1: 
    exchange: inputExchange 
    queue: inputQueue1 
    routing: routing.1 
    replyQueue: replyQueue1 
    dlExchange: reply.dlx1 
    dlQueue: dlx.queue1.reply 
    receiveTimeout: 0 
    replyTimeout: 10000 
    preProcessors: package.processor.LowercaseProcessor 
    postProcessors: package.processor.UppercaseProcessor 
    service2: 
    exchange: inputExchange 
    queue: inputQueue2 
    routing: routing.2 

だから私は動的に各サービスREPLYQUEUE、DLQ、用に構成する私のRabbitTemplateとSimpleMessageListenerContainerを作成する必要があります...

私はこのコードを試してみました:

@Configuration 
public class RabbitTemplatesConfiguration implements BeanFactoryAware { 

    @Autowired 
    private GatewayProperties properties; 
    @Autowired 
    private ConnectionFactory connectionFactory; 

    private BeanFactory beanFactory; 

    @Override 
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException { 
     this.beanFactory = beanFactory; 
    } 

    @PostConstruct 
    public void configure() { 
     Assert.state(beanFactory instanceof ConfigurableBeanFactory, "wrong bean factory type"); 
     ConfigurableBeanFactory configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; 

     Map<String, ServiceProperties> routes = properties.getRoutes(); 
     if (routes != null) { 
      for (String service : routes.keySet()) { 
       ServiceProperties props = routes.get(service); 
       createTemplate(configurableBeanFactory, service, props); 
      } 
     } 
    } 

    private void createTemplate(ConfigurableBeanFactory configurableBeanFactory, String service, ServiceProperties props) { 
     RabbitTemplate template = new RabbitTemplate(connectionFactory); 
     template.setExchange(props.getExchange()); 
     template.setRoutingKey(props.getRouting()); 
     template.setReplyAddress(props.getReplyQueue()); 

     template.setReceiveTimeout(props.getReceiveTimeout()); 
     template.setReplyTimeout(props.getReplyTimeout()); 

     Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); 
     DefaultClassMapper classMapper = new DefaultClassMapper(); 
     classMapper.setDefaultType(Event.class); 
     messageConverter.setClassMapper(classMapper); 
     template.setMessageConverter(messageConverter); 

     configurableBeanFactory.registerSingleton(service + "Template", template); 

     if(!StringUtils.isEmpty(props.getReplyQueue())) { 
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
      container.setQueueNames(props.getReplyQueue()); 
      container.setMessageListener(new MessageListenerAdapter(template)); 
      configurableBeanFactory.registerSingleton(service + "ListenerContainer", container); 
      container.afterPropertiesSet(); //added this but not working either 
      container.start(); //added this but not working either 
     } 
    } 
} 

が、私はREPLYQUEUEに応答を受信したときに、私はこのエラーがあります:

java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': replyQueue1 
    at org.springframework.util.Assert.state(Assert.java:70) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithFixed(RabbitTemplate.java:1312) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1251) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1218) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1189) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1156) 

したがって、SimpleMessageListenerContainerは正しくインスタンス化されていないようです。

は、問題が何であるかを知っていますか?

私のコードは、送信と受信:あなたが応答キューを使用している理由は明らかではない

@Autowired 
private ApplicationContext context; 
@Autowired 
private RabbitAdmin rabbitAdmin; 
@Autowired 
private GatewayProperties properties; 

@PostMapping("/{service}") 
public ResponseEntity<Object> call(@PathVariable("service") String service, @RequestBody Event body) { 
    ServiceProperties serviceProperties = properties.getRoutes().get(service); 

    Queue queue = QueueBuilder.durable(serviceProperties.getQueue()).build(); 
    rabbitAdmin.declareQueue(queue); 
    TopicExchange exchange = new TopicExchange(serviceProperties.getExchange()); 
    rabbitAdmin.declareExchange(exchange); 
    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(serviceProperties.getRouting())); 

    Queue replyQueue = null; 
    if (!StringUtils.isEmpty(serviceProperties.getReplyQueue())) { 
     replyQueue = QueueBuilder.durable(serviceProperties.getReplyQueue()).withArgument("x-dead-letter-exchange", serviceProperties.getDlExchange()).build(); 
     rabbitAdmin.declareQueue(replyQueue); 
     Queue dlQueue = QueueBuilder.durable(serviceProperties.getDlQueue()).build(); 
     rabbitAdmin.declareQueue(dlQueue); 
     TopicExchange dlqExchange = new TopicExchange(serviceProperties.getDlExchange()); 
     rabbitAdmin.declareExchange(dlqExchange); 
     rabbitAdmin.declareBinding(BindingBuilder.bind(dlQueue).to(dlqExchange).with(serviceProperties.getReplyQueue())); 
    } 

    RabbitTemplate template = (RabbitTemplate) context.getBean(service + "Template"); 

    Event outputMessage = (Event) template.convertSendAndReceive(serviceProperties.getExchange(), serviceProperties.getRouting(), body, new CorrelationData(UUID.randomUUID().toString())); 

    //... 
} 

答えて

2

。 RabbitMQは、固定応答キューを使用する理由の大半を取り除く直接返信メカニズムを提供するようになりました(ただし、HA応答キューが必要な場合は例外です)。

つまり、問題はMessageListenerAdapterにテンプレートをラップしていることです。これは必要ではなく(とにかく動作しません)、テンプレートはMessageListenerを実装しています。

+0

それが動作Wahou!あなたの迅速な助けをありがとう。ある日、私はその上にいました:例えば、レスポンスのタイムアウトが発生した場合には、デッドレターキューが必要なので、これを行います。考えられるエラーを処理するためのより良い実装があると思いますか? –

+0

私はあなたが何を意味するか分かりません。 'replyTo'がどのように設定されているか(固定キュー、一時キュー、またはダイレクトreply-toを持つリスナーコンテナ)にかかわらず、タイムアウトの処理は同じです。 –

+0

私はデフォルトの直接返信する(amq.rabbitmq.reply-に)キューを使用している場合は、私が(何らかの理由で)、メッセージのでDLQを持って消費し、エラーではありませんが、バックキューに設定されており、 (メッセージが送信されるべきサードパーティのアプリケーションが接続されていないと想像してください)。ですから、私はDLQを処理可能ではないこれらのメッセージを格納する必要があり、replyQueue引数に設定するこの実装を見ました。より良いエラー管理はありますか? –

関連する問題