私は、データプロデューサからデータを受け取り、処理する必要があるアプリケーションを作成する必要があります。私はメッセージブローカーとしてRabbitMQを選びました。私のテストでは、最良の結果ではないことが分かりました。RabbitMQ遅い受信速度
送信 - 100 msg;
プロデュース - 100 msg/s;
消費 - 6 msg/s;
これを解決するには、listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
を入力してください。しかし、いくつかのキューについては確認が必要です。また、データ処理のためにはメッセージの順序が重要であるため、ワーカーを使用して並列処理することはできません。
受信の速度を上げることは可能ですか?
プロデューサー:
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public FanoutExchange exchange() {
return new FanoutExchange("exchange-1");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange());
}
...
rabbitTemplate.setExchange("exchange-1");
rabbitTemplate.convertAndSend(data);
消費者:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip");
connectionFactory.setUsername("name");
connectionFactory.setPassword("pswd");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue("queue-1", false);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueues(queue());
listenerContainer.setMessageListener(new Receiver());
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
...
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis());
}
例では、2つのvCPUと4 GBのメモリでテストされています。