2016-11-28 17 views
0

私はSpringでのメッセージ処理にかなり新しいので、私に同行してください。SpringのRabbitMQメッセージの同時処理

私のRabbitMQメッセージハンドラは、複数のスレッドで同時にメッセージを処理したいと思います。

@Component 
public class ConsumerService { 

    @RabbitListener(queues = {"q"}) 
    public void messageHandler(@Payload M msg) { 
     System.out.println(msg);   
    } 

} 

... 

@Configuration 
@Import({MessageConverterConfiguration.class, ConsumerService.class}) 
public class ConsumerConfiguration { 

    @Autowired 
    private ConnectionFactory connectionFactory; 

    @Bean 
    public List<Declarable> declarations() { 
     return Arrays.asList(
      new DirectExchange("e", true, false), 
      new Queue("q", true, false, false), 
      new Binding("q", Binding.DestinationType.QUEUE, "e", "q", null) 
     ); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(MessageConverter contentTypeConverter, SimpleRabbitListenerContainerFactoryConfigurer configurer) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConcurrentConsumers(10); 
     configurer.configure(factory, connectionFactory); 
     factory.setMessageConverter(contentTypeConverter); 
     return factory; 
    } 
} 

私の小さなテストでは、キュー "q"に4つのメッセージがあります。私はそれらをすべて処理します。それは結構です。しかし、私はそれを一つずつ処理します。 "ConsumerService.messageHandler"(本質的にメッセージの処理の完了を遅らせる)にブレークポイントを設定すると、そのブレークポイントに4つのスレッドがあることになります。しかし、私は決して複数のスレッドを持っていません。メッセージの処理を完了するとすぐに、次のメッセージが処理されます。メッセージを同時に処理するには何が必要ですか?

答えて

0

申し訳ありませんが、私はそれが動作していることを書くのを忘れていました。基本的に私が今持っていることは次のとおりです。

... 
factory.setConcurrentConsumers(10); 
factory.setMaxConcurrentConsumers(20); 
factory.setConsecutiveActiveTrigger(1); 
factory.setConsecutiveIdleTrigger(1); 
factory.setPrefetchCount(100); 
... 

は、私はそれが実際に最終的に(十分な負荷の下で)並行してメッセージを処理するだけではconcurrentConsumersと信じています。問題は私の小さなテストでわずか4つのメッセージしか持っていなかったので、それ以上の消費者(スレッド)を起動することは決してありません。ここでは、continuousActiveTriggerを1に設定すると役立ちます。 prefetchCountにも何か言いたいことがあります。とにかく、ケースが閉まった。

1

あなたの消費者でmessae処理を処理するためにスレッドプールを使用するか、この

  • を達成するための二つの方法があります。
  • または、複数のコンシューマーを作成します。

は、私はあなたがプロパティが自動的に春AMQPすることで、複数の消費者を作るの取り扱いconcurrentConsumersを使用していました。 PrefetchCountを1に設定し、MaxConcurrentConsumersも設定してください。

ほとんどの場合、すでにキューに4つのメッセージがあり、デフォルト値Prefetch Countが大きいため、1つのコンシューマがキューにあるすべてのメッセージを消費しています。

関連する問題