2011-10-31 7 views
9

メッセージを受信するサブスクライバがないことに関連するのではなく、「耐久性」と「永続モード」がリブートに関連しているように見えます。サブスクライバなしのRabbitMQキュー

私は、RabbitMQが購読者がいないときにメッセージをキューに保持したいと考えています。加入者がオンラインになると、メッセージはその加入者によって受信されるべきである。これはRabbitMQで可能ですか?

コードサンプル:

サーバー:

namespace RabbitEg 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        //channel.ExchangeDelete(EXCHANGE_NAME); 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 
        //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn); 

        for (int i = 0; i < 100; i++) 
        { 
         byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i); 
         IBasicProperties channelProps = channel.CreateBasicProperties(); 
         channelProps.SetPersistent(true); 

         channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad); 

         Console.WriteLine("Sent Message " + i); 
         System.Threading.Thread.Sleep(25); 
        } 

        Console.ReadLine(); 
       } 
      } 
     } 
    } 
} 

クライアント:

namespace RabbitListener 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 

        string queueName = channel.QueueDeclare("myQueue", true, false, false, null); 
        channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld"); 

        Console.WriteLine("Waiting for messages"); 

        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
        channel.BasicConsume(queueName, true, consumer); 

        while (true) 
        { 
         BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
         Console.WriteLine(Encoding.ASCII.GetString(e.Body)); 
        } 
       } 
      } 
     } 
    } 
} 

答えて

12

durablepersistentが何を意味するかの説明についてはAMQP Referenceを参照してください。

キューは、durableまたはnon-durableのいずれかです。前者はブローカーの再開を生き延び、後者はやりません。

メッセージはまたはpersistentのいずれかとして公開されています。考え方は、persistentキュー上のメッセージは、ブローカ再起動時にも生き残るはずです。

したがって、あなたは1)durableというキューを宣言し、2)メッセージをpersistentとして公開する必要があります。また、パブリッシャーがをチャンネルで確認できるようにすることもできます。そうすれば、ブローカーがいつメッセージの責任を負うかを知ることができます。

+0

ありがとう、私はこれを試しましたが、クライアントがリッスンしていない場合でもメッセージは保持されません。質問に添付されたコードサンプル。 –

+0

良いコード。 2つの問題:1)サーバもキューを宣言しなければならない。それを2回宣言することは問題ではなく、それは良い方法であり、2)queueDeclare()はあなたに匿名の耐久性のないキューを与えます。 queueDeclare( "myQueue"、true、false、false、null)が必要です。 – scvalex

+0

また、質問を編集した方法によって、達成しようとしていることを理解することが難しくなります。 – scvalex

関連する問題