2016-09-16 8 views
1

RabbitMQを使用して1つの要求と1つの応答キューを使用して分散RPCのようなソリューションを実装しようとしています。 Apache Apolloと私はRabbitMQに移行できるようにしたいと思っていました。ここで重要なポイントは以下のとおりです。RabbitMQ:ルーティングを使用してメッセージを選択する

  • 各サーバは、要求キューに接続しているアポロのための私の実装では(ヘッダフィールド)

を彼のためにすることになっている

  • 各サーバプロセスの要求だけはキーポイントは、セレクタの使用(ヘッダフィールドの値の句のようなwhere節のようなもの)だったので、これはRabbitMQでRoutingとRoutingキーを使って達成されたと思ったが、間違っていなければならない。 。

    問題を再現するためにRoutingサンプル(http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html)を修正しました。私は、routingKeyを定義するさまざまなパラメータとコンシューマのいずれかのメッセージを生成するプロデューサから開始できる2つのコンシューマを持っています。私が見ている行動は、メッセージの消費がランダムであるように見えるということです(メッセージは「John」が初めて「John」の消費者によって処理され、消費者によって「Mary」が2回目に処理されます)。

    誰にもRabbitMQでセレクタを使用する際の表示やコードスニペットは?

    public static void Main(String[] args) 
    { 
        var factory = new ConnectionFactory { HostName = "localhost" }; 
        using (var connection = factory.CreateConnection()) 
         using (var channel = connection.CreateModel()) 
         { 
          const String request = "request"; 
          channel.ExchangeDeclare(request, "direct"); 
    
          channel.QueueDeclare(request, true, false, false, null); 
    
          if (args.Length < 1) 
          { 
           Console.WriteLine(" Press [enter] to exit."); 
           Console.ReadLine(); 
           Environment.ExitCode = 1; 
           return; 
          } 
    
          var myRoutingKey = args[0]; 
          channel.QueueBind(request, request, myRoutingKey); 
    
          Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}."); 
    
          var consumer = new EventingBasicConsumer(channel); 
          consumer.Received += (model, ea) => 
          { 
           var body = ea.Body; 
           var message = Encoding.UTF8.GetString(body); 
           var routingKey = ea.RoutingKey; 
           Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); 
          }; 
          channel.BasicConsume(request, true, consumer); 
    
          Console.WriteLine(" Press [enter] to exit."); 
          Console.ReadLine(); 
         } 
    } 
    

    とプロデューサーのために:事前に

    public static void Main(String[] args) 
    { 
        var factory = new ConnectionFactory { HostName = "localhost" }; 
        using (var connection = factory.CreateConnection()) 
         using (var channel = connection.CreateModel()) 
         { 
          const String request = "request"; 
          channel.ExchangeDeclare(request, "direct"); 
    
          channel.QueueDeclare(request, true, false, false, null); 
    
          var routingKey = args.Length > 0 ? args[0] : "John"; 
    
          const String message = "Hi"; 
          var body = Encoding.UTF8.GetBytes(message); 
          channel.BasicPublish(request, routingKey, null, body); 
          Console.WriteLine($" [x] Sent '{routingKey}':'{message}'"); 
         } 
    
        Console.WriteLine(" Press [enter] to exit."); 
        Console.ReadLine(); 
    } 
    

    おかげで、消費者のための私のコードの下

  • 答えて

    0

    これがなぜあなたにとって役に立たなかったのか推測できます。キーはあなたの消費者のこれらの2つの行です。

    「リクエスト」は消費者の「すべて」のキューの名前です。複数のルーティングキーを使用して複数のバインディングを設定するこのプログラムを実行すると、「request」という名前のキューは複数のルーティングキー(例:「John」、「Mary」)を使用してExchangeにバインドされます。このバインディングを実行すると、バインディングはRabbitMQサーバーで一時的ではなく、それらが固執することに注意してください。

    あなたの問題を解決する方法に戻る。複数のオプションがありますが、ここにその1つがあります。まず、RabbitMQ Modelを読むことをお勧めします。

    var queueName = channel.QueueDeclare().QueueName; 
    channel.QueueBind(queueName, request, myRoutingKey); 
    

    しかし、上記の新しいキューをご希望との交流に作成してバインドされている、あなたの消費者のプログラムを実行するたびに意味:

    あなたは代わりにあなたのこれらの行を持っている同じチュートリアルのコードを使用することができますルーティングキー。別の方法としては、前と同じコードを使用することですが、固定キュー名の代わりにキュー名を適切に選択するだけです。たとえば、あなたはキー

    var queueName = myRoutingKey ; 
    channel.QueueDeclare(queueName, true, false, false, null); 
    channel.QueueBind(queueName, request, myRoutingKey); 
    

    をルーティングするごとに1つのキューを持っているか、することができます代わりに、あなたのチュートリアルのサンプルに似た単一のキューにグループのルーティングキーの数をすることができます。

    要点は、1つのキューでは実行できないことです。 (あなたがそれらを消費するときにメッセージをフィルタリングする以外)。しかし、それはあなたにとって本当の必要条件のようには聞こえませんでした。あなたが尋ねたのは、各コンシューマサーバーは、このモデルで実行できる関連メッセージのみを処理するということです。プロデューサーはその交換所にのみ公開します(これはあなたが望むものです)。

    +0

    こんにちはアミン、あなたの答えに感謝します。また、動的に生成されたキューを使用すると問題が発生しないこともわかりました。とにかく、パフォーマンス上の懸念から、また以前に他のブローカーと一緒に実装していたソリューションがそのようなものだったので、リクエストと応答キューの両方で解決策を探していました。後者は問題ではなく、いくつかの再設計が必要ですが、パフォーマンスを確認する必要があります。 – Leon

    +0

    また、動的に生成されたキューを使用することによって、ルーティングコンシューマが実行されていない間に生成されたプロデューサからのすべてのメッセージが失われます。私はまた交換を耐久性あると宣言しようとしましたが、それは助けにはなりません。 – Leon

    +0

    動的に生成されたキューは、私が想定しているキューの名前を失うため、基本的には一時的なキューになります。ダイナミックキュー名は、一貫性のある名前を使用するだけの解決策になるとは思わない(実際にセレクタとして考える)。このようにしても、パフォーマンスは低下しません。これはRabbitMQが設計したものです。 –

    関連する問題