2017-07-29 15 views
1

ここには単純な.netコア1.1コンソールアプリケーションがあります。 -rパラメータを指定して呼び出すと、rabbitmq Queueのすべてのメッセージが読み込まれ、他のパラメータとともに呼び出され、各パラメータがメッセージとしてエンキューされます。.NET Core 1.1 RabbitMQメッセージを受信

ここで問題はありますが、メッセージを正しくエンキューできますが、メッセージを読み取ろうとするとすべてのメッセージが読み取られません。明らかに、私はキューを正しく消費していないので、いくつかのガイダンスに感謝します。

ありがとうございます!

using System; 
using System.Collections.Generic; 
using System.Text; 
using Newtonsoft.Json; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace RabbitMqDemo 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var client = new MessagingClient(); 
      if (args.Length == 1 && args[0].ToLower() == "-r") 
      { 
       Console.WriteLine("Reading Messages from Queue."); 
       var messages = client.ReceiveMessages(); 
       Console.WriteLine($"Read {messages.Length} message(s) from queue."); 
       foreach(var msg in messages) 
        Console.WriteLine(msg); 
      } 
      else 
      { 
       foreach (var msg in args) 
       { 
        client.SendMessage(msg); 
       } 
       Console.WriteLine($"Enqueued {args.Length} Message."); 
      } 
     } 
    } 

    internal class MessagingClient 
    { 
     private readonly ConnectionFactory connectionFactory; 
     private string ExchangeName => "defaultExchange"; 
     private string RoutingKey => ""; 
     private string QueueName => "Demo"; 

     private string HostName => "localhost"; 


     public MessagingClient() 
     { 
      this.connectionFactory = new ConnectionFactory {HostName = this.HostName}; 
     } 

     public void SendMessage(string message) 
     { 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        var properties = this.SetMessageProperties(channel, message); 

        string messageJson = JsonConvert.SerializeObject(message); 
        var body = Encoding.UTF8.GetBytes(messageJson); 

        channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body); 
       } 
      } 
     } 

     public string[] ReceiveMessages() 
     { 
      var messages = new List<string>(); 
      using (var connection = this.connectionFactory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        this.QueueDeclare(channel, this.QueueName); 
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 


        var consumer = new EventingBasicConsumer(channel); 
        consumer.Received += (model, ea) => 
        {       
         string bodystring = Encoding.UTF8.GetString(ea.Body); 
         messages.Add(bodystring); 

         // ReSharper disable once AccessToDisposedClosure 
         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
        }; 
        channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer); 
       } 
      } 
      return messages.ToArray(); 
     } 

     private void QueueDeclare(IModel channel, string queueName) 
     { 
      channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct, 
       durable: true, 
       autoDelete: false, 
       arguments: null); 

      var queueDeclared = channel.QueueDeclare(queue: queueName, 
       durable: true, 
       exclusive: false, 
       autoDelete: false, 
       arguments: null); 

      channel.QueueBind(queueName, ExchangeName, RoutingKey);    
     } 

     private IBasicProperties SetMessageProperties(IModel channel, object message) 
     { 
      var properties = channel.CreateBasicProperties(); 
      properties.ContentType = "application/json"; 
      properties.Persistent = true; 
      return properties; 
     } 

    } 
} 

答えて

1
  • まず、あなたの交換・キューが正しく設定されていることと、メッセージがそれに公開されていることを確認するために、管理UIを使用します。
  • 第2に、ReceiveMessages()であるため、イベントが発生する前に空の配列が返される可能性があります。コンシューマがRabbitMQからメッセージを受け取っている間、待つコードはありません。 the tutorialにはどのようにConsole.ReadLine()が使用されていることに注意してください。あなたの例では、同期オブジェクト(ManualResetEvent)を使用して、特定のメッセージ数が読み取られるまでReceiveMessages()が返されないようにすることができます。
+0

男私は愚かだと感じますか?もちろん、実行スレッドをブロックしません。私はいくつかのブロッキングロジックを追加し、それはチャンピオンのように動作します。ありがとうルーク! – cdarrigo

+0

うれしい –

関連する問題