2017-02-14 22 views
0

私はRabbit MQを使ってメッセージング用のアプリケーションを開発しています。私はEventingBasicConsumer,交換QueueBindを使用しています。テストのために、メッセージを受信するアプリケーションを実行してから、イーサネットコントローラをオフにします。私は受信メッセージを再びオンにした後、約500のメッセージがだったので、が失われました。 マイコード:RabbitMQ EventingBasicConsumerはメッセージを失う

private void DoWork() 
    { 
     try 
     { 
      var connection = ConnectionFactory.CreateConnection(); 
      IModel model = connection.CreateModel(); 

      // Configure the Quality of service for the model. Below is how what each setting means. 
      // BasicQos(0="Dont send me a new message untill I’ve finshed", 1= "Send me one message at a time", false ="Apply to this Model only") 
      model.BasicQos(0, 1, false); 

      model.ExchangeDeclare(Options.RabbitConnectionOptions.Exchange, RabbitConstants.ExchangeType, RabbitConstants.ExchangeDurable, RabbitConstants.ExchangeAutoDelete); 
      var queueDeclareOk = model.QueueDeclare("SomeSubsruberQueue3", RabbitConstants.QueueDurable, RabbitConstants.QueueExclusive, RabbitConstants.ExchangeAutoDelete); 


      var queueName = queueDeclareOk.QueueName; 

      foreach (var optionsBindingKey in Options.BindingKeys) 
      { 
       Logger.Debug($"QueueBind for {nameof(optionsBindingKey)}={optionsBindingKey}"); 
       model.QueueBind(queueName, Options.RabbitConnectionOptions.Exchange, optionsBindingKey); 
      } 


      var consumer = new EventingBasicConsumer(model); 

      consumer.Received += (ch, ea) => 
      { 
       try 
       { 
        var body = ea.Body; 
        var message = Encoding.UTF8.GetString(body); 
        var routingKey = ea.RoutingKey; 
        var messageToLog = $" [x] Received '{routingKey}':'{message}'";       

        Logger.Info(messageToLog); 
        Console.WriteLine($"receavedCount={++receavedCount}"); 

       } 
       catch (Exception e) 
       { 
        Console.WriteLine(e); 
        throw; 
       } 
      }; 

      model.BasicConsume(queueName, RabbitConstants.QueueAutoAck, consumer); 
     } 
     catch (Exception e) 
     { 
      Logger.Error(e); 
      Console.WriteLine(e); 
      throw; 
     } 
    } 

がどのように私はメッセージを失う防ぐことができますか?

p.s.ここでログには、失われたメッセージが表示されます。

は私が変更した場合::

model.BasicConsume(queueName,true, consumer); 

にuが見ることができるように "ID" が926から1299へのidのとマッサージが

2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="922" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="923" Name="1c" /> 
2017-02-14 10:52:01.4916 <Root><Subscribers><Subscriber Id="924" Name="1c" /> 
2017-02-14 10:52:01.5056 <Root><Subscribers><Subscriber Id="925" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1300" Name="1c" /> 
2017-02-14 10:52:22.5606 <Root><Subscribers><Subscriber Id="1301" Name="1c" /> 
2017-02-14 10:52:22.5676 <Root><Subscribers><Subscriber Id="1302" Name="1c" /> 
2017-02-14 10:52:22.6046 <Root><Subscribers><Subscriber Id="1303" Name="1c" /> 

UPD失われたことを、シーケンシャルであります

model.BasicConsume(queueName,false, consumer); 

明示的ASK

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

私は非常に奇妙な行動を持っている:

2017-02-14 13:06:35.9835| DeliveryTag=23, <Subscriber Id="778" Name="1c" /> 
2017-02-14 13:06:36.0295| DeliveryTag=24, <Subscriber Id="779" Name="1c" /> 
2017-02-14 13:06:57.3285| DeliveryTag=26, <Subscriber Id="782" Name="1c" /> 
2017-02-14 13:06:57.3755| DeliveryTag=27, <Subscriber Id="783" Name="1c" /> 

DeliveryTag = 25はありません!

+0

ルース=そのかわいそう、失う= ..... – BugFinder

答えて

2

自動ACKを使用しないでください。メッセージを読んだり、何かしたいことをしたり、それを済ませたら明示的にACKを返します。

0

Ок、感謝エミールVikström、私は、ASK、明示的に使用する必要があります。

model.BasicConsume(queueName,false, consumer); 

とメッセージを処理した後にお願いします:失われたメッセージについて

((EventingBasicConsumer)ch).Model.BasicAck(ea.DeliveryTag, false); 

その他の質問:それを返すときのRabbitMQは、メッセージの並べ替えをキュー。だから、それだけではなく、遅く受信されるでしょう

関連する問題