2016-12-22 10 views
0

私はRabbitMQ .NETクライアントを使用しています。ネットワークが切断されると、私たちのサービスはメッセージを失います。 テストアプリケーションを作成し、 "BasicAcks"イベントを使用して切断が発生したときに確認応答を受け取らなかったすべてのメッセージを再送信しようとしましたが、まだメッセージが失われています。切断はConnectionShutdownイベントで検出されます(ReplyCode "451"を探しています)。 受信したメッセージを確認するには、すべてのメッセージを消費し、少なくとも0から49999までのすべての数字を含むコンテンツを読んでください。 ネットワークが安定しているときは完全に動作しています。不安定なネットワークをシミュレートすると(ネットワークアダプターを無効にする)、数百のメッセージが失われることがあります。ここでネットワーク損失時のRabbitMQでのメッセージ公開の保証

はコードです:

private static ConcurrentQueue<byte[]> sendQueue = new ConcurrentQueue<byte[]>(); 
private static ConcurrentDictionary<ulong, byte[]> waitingForAck = new ConcurrentDictionary<ulong, byte[]>(); 
private static bool stop; 

private static void Main() 
{ 
    var server = "192.168.1.123"; 
    var userName = "rabbitmq"; 
    var password = "rabbitmq"; 
    var sendCount = 50000; 
    try 
    { 
     Task.Run(() => Send(server, userName, password, "TestExchange", sendCount)); 

     Task.Run(() => 
     { 
      while (true) 
      { 
       Console.WriteLine("Total sent:{0}", totalPackages.Count); 
       Console.WriteLine("Packages waiting in send queue:{0}", sendQueue.Count); 
       Console.WriteLine("Packages waiting for ack:{0}", waitingForAck.Count); 
       Console.WriteLine(); 

       if (stop) 
       { 
        break; 
       } 
       Thread.Sleep(1000); 
      } 
     }); 

     Console.ReadLine(); 

     stop = true; 
    } 
    catch (Exception exception) 
    { 
     Console.WriteLine("Exception: {0}", exception.Message); 
    } 

    Console.ReadLine(); 
} 

public static void Send(string server, string userName, string password, string exchangeName, int sendCount) 
{ 
    for (int i = 0; i < sendCount; i++) 
    { 
     var content = String.Format("Hello World: {0}", i); 
     var data = Encoding.UTF8.GetBytes(content); 
     sendQueue.Enqueue(data); 
    } 

    var factory = new ConnectionFactory { HostName = server, UserName = userName, Password = password }; 
    factory.AutomaticRecoveryEnabled = true; 
    using (var connection = factory.CreateConnection()) 
    { 
     connection.ConnectionShutdown += Connection_ConnectionShutdown; 

     using (var channel = connection.CreateModel()) 
     { 
      channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); 

      channel.ConfirmSelect(); 

      channel.BasicAcks += Channel_BasicAcks; 

      while (!stop) 
      { 
       byte[] data; 
       if (!sendQueue.TryDequeue(out data)) 
       { 
        Thread.Sleep(100); 
        continue; 
       } 
       if (data == null) 
       { 
        continue; 
       } 
        var publishTag = channel.NextPublishSeqNo; 

        try 
        { 
         if (!waitingForAck.TryAdd(publishTag, data)) 
         { 
          Console.WriteLine("Cannot prepare {0}", publishTag); 
         } 

         channel.BasicPublish(exchangeName, string.Empty, null, data); 
         totalPackages.Enqueue(data); 
        } 
        catch (Exception) 
        { 
         byte[] ignored; 
         if (!waitingForAck.TryRemove(publishTag, out ignored)) 
         { 
          Console.WriteLine("cannot delete - exception"); 
         } 
         Console.WriteLine("Requeue {0}", publishTag); 
         sendQueue.Enqueue(data); 
         Thread.Sleep(1000); 
         continue; 
        } 

      } 

      while (waitingForAck.Count > 0) 
      { 
       Thread.Sleep(1000); 
       Console.WriteLine("Waiting for missing acks"); 
      } 
     } 
    }    
} 

private static void Channel_BasicAcks(object sender, BasicAckEventArgs e) 
{ 
    byte[] ignored; 
    if (e.Multiple) 
    { 
     var ids = waitingForAck.Keys.Where(x => x <= e.DeliveryTag).ToArray(); 
     foreach (var id in ids) 
     { 
      if (!waitingForAck.TryRemove(id, out ignored)) 
      { 
       Console.WriteLine("cannot delete {0}", id); 
      } 
     } 
    } 
    else 
    { 
     if (!waitingForAck.TryRemove(e.DeliveryTag, out ignored)) 
     { 
      Console.WriteLine("cannot delete {0}", e.DeliveryTag); 
     } 
    } 
} 

private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) 
{ 
    if (e.ReplyCode == 541) 
    { 
     var temp = waitingForAck.Values.ToList(); 
     waitingForAck.Clear(); 
     Console.WriteLine("Connection lost, requeue {0} messages", temp.Count); 
     foreach (var message in temp) 
     { 
      sendQueue.Enqueue(message); 
     } 
    } 
} 

誰かが私が間違ってやっているものを私に伝えることができますか? this例のように

channel.QueueDeclare(queue: "hello", durable: true); 

+0

投稿されたコードは、実際のRabbitMQコードではなく、テストハーネスです。あなたのRabbitMQコード( 'Send(...)'メソッドの内容)を投稿するのに役立ちます –

+0

コードをスクロールするだけで、そこにすべてあります。 –

答えて

0

問題は送信部ではなく、受信部でした。私は間違いを各メッセージの末尾に 'i'をチェックしましたString.Format("Hello World: {0}", i);

上記のコードは正常に動作しています、すべてのメッセージは少なくとも1回送信されます。

0

あなたは耐久性のあるキューを宣言する必要があります。

+0

Exchangeにバインドされた永続キューがすでに存在し、すべてのメッセージを受信して​​います。私が言ったように、それはネットワーク接続が安定しているとき正常に動作しています.. –

関連する問題