2017-11-02 16 views
0

私は、データプロデューサからデータを受け取り、処理する必要があるアプリケーションを作成する必要があります。私はメッセージブローカーとしてRabbitMQを選びました。私のテストでは、最良の結果ではないことが分かりました。RabbitMQ遅い受信速度

送信 - 100 msg;
プロデュース - 100 msg/s;
消費 - 6 msg/s;

これを解決するには、listenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE); を入力してください。しかし、いくつかのキューについては確認が必要です。また、データ処理のためにはメッセージの順序が重要であるため、ワーカーを使用して並列処理することはできません。

受信の速度を上げることは可能ですか?

プロデューサー:

@Bean 
Queue queue() { 
    return new Queue(queueName, false); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip"); 
    connectionFactory.setUsername("name"); 
    connectionFactory.setPassword("pswd"); 
    return connectionFactory; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
public FanoutExchange exchange() { 
    return new FanoutExchange("exchange-1"); 
} 

@Bean 
public Binding binding(){ 
    return BindingBuilder.bind(queue()).to(exchange()); 
} 

...

rabbitTemplate.setExchange("exchange-1"); 
rabbitTemplate.convertAndSend(data); 

消費者:

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("ip"); 
    connectionFactory.setUsername("name"); 
    connectionFactory.setPassword("pswd"); 
    return connectionFactory; 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
Queue queue() { 
    return new Queue("queue-1", false); 
} 

@Bean 
public SimpleMessageListenerContainer listenerContainer() { 
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); 
    listenerContainer.setConnectionFactory(connectionFactory()); 
    listenerContainer.setQueues(queue()); 
    listenerContainer.setMessageListener(new Receiver()); 
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); 
    return listenerContainer; 
} 

...

@Override 
public void onMessage(Message message) { 
    System.out.println("Received message: " + fromBytes(message.getBody()) + " \n Time = " + System.currentTimeMillis()); 
} 

例では、2つのvCPUと4 GBのメモリでテストされています。

答えて

0

コンテナのprefetchCountを大きくすると、パフォーマンスが大幅に向上します。ただし、メッセージを拒否して再キューした場合、順序は失われます(キューに入れられたメッセージはプリフェッチされたメッセージの背後にあります)。

関連する問題