2016-08-09 25 views
1

上記のコードを使用して、複数のプロデューサが同じExchangeに異なるルーティングキーで送信した複数のメッセージを消費し、各メッセージをデータベースに挿入できました。バッチでメッセージを消費する - RabbitMQ

しかし、メッセージが次々にDBに挿入されるため、リソースの消費量が多すぎます。だから私はバッチインサートのために行くことを決めた。BasicQos

BasicQosでメッセージの制限を10に設定した後、私の予想はConsole.WriteLineが10個のメッセージを書き込む必要があるが期待どおりではないということだ。

私の期待は、キューからN個のメッセージを消費して一括挿入を行うことで、成功にここにはACK

は、私が使用するコードの一部ではない他のACK送信します。

using (var connection = factory.CreateConnection()) 
{ 
    using (var channel = connection.CreateModel()) 
    { 
     channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A"); 
     channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B"); 

     channel.BasicQos(0, 10, false); 

     var consumer = new EventingBasicConsumer(channel); 
     channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer); 

     consumer.Received += (model, ea) => 
     { 
      try 
      { 
       var body = ea.Body; 
       var message = Encoding.UTF8.GetString(body); 

       // Insert into Database 

       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
       Console.WriteLine(" Recevier Ack " + ea.DeliveryTag); 
      } 
      catch (Exception e) 
      { 
       channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true); 
       Console.WriteLine(" Recevier No Ack " + ea.DeliveryTag); 
      } 
     }; 

     Console.ReadLine(); 
    } 
} 

答えて

3

BasicQos = 10クライアントは一度に10のメッセージを取得することを意味するが、あなたはそれを消費するときには、常に一つのメッセージの時間が表示されます。 はここで読む:https://www.rabbitmq.com/consumer-prefetch.html

AMQPは(別名「プリフェッチ数」) を消費するときに、チャネル(または接続)に認められていないメッセージの数 を制限することができるようにbasic.qos方法を指定します。

あなたの範囲では、メッセージをダウンロードして一時的なリストの中に入れ、DBに挿入する必要があります。

、あなたが使用することができます。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true); 

無効basicAck()

パラメータ: deliveryTag - 受信 AMQP.Basic.GetOkまたはAMQP.Basic.Deliver

からのタグ

複数 - 肯定応答するためにはtrue 提供された配信タグまでのすべてのメッセージ。 falseに を送ってください。

final List<String> myMessagges = new ArrayList<String>(); 
     channel.basicConsume("my_queue", false, new DefaultConsumer(channel) { 

      @Override 
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 
       myMessagges.add(new String(body)); 
       System.out.println("Received..."); 

       if (myMessagges.size() >= 10) { 
        System.out.println("insert into DB..."); 
        channel.basicAck(envelope.getDeliveryTag(), true); 
        myMessagges.clear(); 
       } 


      } 
     }); 
+0

感謝。ここでは "ea"の範囲は、消費者のイベントです。受信したので、すべてのメッセージをDBに挿入した後、どのようにACKを行うことができますか? –

+0

さて、各メッセージに対して 'basicAck'を実行せず、' multiple = true'を使って各xメッセージを実行してください。 – Gabriele

+0

私はそれを正しく理解していればconsumer.Receivedの範囲内にあります。 'であれば、ACKはBasicQosで設定された10個のメッセージすべてを消費した後にのみ送信されます。再び気にしてすみません。いくつかのコードサンプルを私に与えることができれば役に立ちます。前もって感謝します! –

関連する問題