2017-05-02 22 views
1

私はC#/ .NETのRabbitMQでトピックベースルーティングの概念証明を行っています(EasyNetQライブラリを使用)。私のテストでは、トピックルーティング( "TopicA"と "TopicB")によって2つの耐久性のあるキューにバインドされた単一のエクスチェンジがあります。ここEasyNetQを使用して複数のRabbitMQキューを購読する

生産(C#のコンソールアプリケーション)のためのコードである:ここ

using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx")) 
{ 
    Random random = new Random(); 
    Foo foo; // my test message class 

    for (int i = 0; i < 100; i++) 
    { 
     int coin = random.Next(0, 2); 

     if (coin == 0) 
     { 
      foo = new Foo() { Payload = "Heads" }; 
      bus.Publish(foo, "TopicA"); 
      Console.WriteLine($"Published message {i} to TopicA."); 
     } 
     else 
     { 
      foo = new Foo() { Payload = "Tails" }; 
      bus.Publish(foo, "TopicB"); 
      Console.WriteLine($"Published message {i} to TopicB."); 
     } 
    } 
} 

は自分の消費者コード(また、C#のコンソールアプリケーション)である:

class Program 
{ 
    static void Main(string[] args) 
    { 
     TestRabbitMQSubscribe(); 
     Console.ReadKey(false); 
    } 

    private static void TestRabbitMQSubscribe() 
    { 
     using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx")) 
     { 
      bus.Subscribe<Foo>("TopicA", HandleFooA, config => config.WithTopic("TopicA")); 
      bus.Subscribe<Foo>("TopicB", HandleFooB, config => config.WithTopic("TopicB")); 
     } 
    } 

    private static void HandleFooA(Foo foo) 
    { 
     Console.WriteLine($"Received {foo.Payload} from TopicA."); 
     File.AppendAllText(@"c:\heads.txt", foo.Payload + Environment.NewLine); 
    } 

    private static void HandleFooB(Foo foo) 
    { 
     Console.WriteLine($"Received {foo.Payload} from TopicB."); 
     File.AppendAllText(@"c:\tails.txt", foo.Payload + Environment.NewLine); 
    } 
} 

プロデューサコードがで実行されTopicBキューには53のメッセージが含まれていて、TopicAキューには47のメッセージが含まれていることをRabbitMQ管理者UIを使用して確認できます。

次に、コンシューマーコードを実行すると、TopicAキュー内のすべてのメッセージをプルして、適切なファイルに書き込むように見えます。しかし、それはTopicBキュー内のほんの一握りのメッセージを受信するか、まったく受信しません。その後、Console.ReadKey()コールで停止します。

bus.Subscribe()コールの順序を逆にすると、TopicBではなくTopicBからのメッセージが引き出されます。

私は単純なもの(例えばブロッキングコール)が欠落しているか、根本的にいくつかのRabbitMQまたはEasyNetQの概念を誤解しているように感じます。

+0

メッセージがすべて生成されるとすぐに、プロデューサの 'using'ブロックが終了するように見えます。これにより、待ち行列が死ぬことができます。 –

+0

これで運がいい? –

+0

@MarkLarter、私はそれを再訪する機会はなかったが、あなたの答えに感謝する - それは私が調査する最初の道だ。 –

答えて

1

これはコードのほんの一部を修正したもので、ほとんどはProducerの終了を防ぐためにReadLine()をどこに置くかで動作しているようです。実際の実装では、プロデューサをアクティブにしてキューの耐久性を確保する他のメカニズムがあります。

主な考え方は、コンシューマのサブスクリプションがキューからすべてを読み込む前にプロデューサの公開接続を終了しないことです。

ソリューションのスタートアッププロジェクトとしてプロデューサとコンシューマの両方を設定して同時に実行することも、プロデューサを実行することもできます(Enterキーは押さないでください)。その後コンシューマを実行することができます。プロデューサーusingブロックが範囲外にならない限り、すべて良好で耐久性があります。その全体が

投稿コードは、念のため:

プロデューサー:

using System; 
using EasyNetQ; 
using Messages; 

namespace Producer 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz")) 
      { 
       var random = new Random(); 
       for (var i = 1; i <= 100; ++i) 
       { 
        var coin = random.Next(0, 2); 
        if (coin == 0) 
        { 
         bus.Publish(new CoinFlipMessage { Payload = "Heads" }, CoinFlipMessage.HeadsTopic); 
         Console.WriteLine($"Published message {i} to {CoinFlipMessage.HeadsTopic}"); 
        } 
        else 
        { 
         bus.Publish(new CoinFlipMessage { Payload = "Tails" }, CoinFlipMessage.TailsTopic); 
         Console.WriteLine($"Published message {i} to {CoinFlipMessage.TailsTopic}."); 
        } 
       } 
       Console.ReadLine(); 
      } 
     } 
    } 
} 

消費者:

using System; 
using System.IO; 
using EasyNetQ; 
using Messages; 

namespace Consumer 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      using (var bus = RabbitHutch.CreateBus("host=zzz;username=zzz;password=zzz")) 
      { 

       bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.HeadsTopic, HandleHeads, config => config.WithTopic(CoinFlipMessage.HeadsTopic)); 
       bus.Subscribe<CoinFlipMessage>(CoinFlipMessage.TailsTopic, HandleTails, config => config.WithTopic(CoinFlipMessage.TailsTopic)); 
       Console.ReadLine(); 
      } 
     } 

     private static void HandleHeads(CoinFlipMessage message) 
     { 
      if (message == null) return; 
      headsCount++; 
      var payload = message.Payload; 
      Console.WriteLine($"Received {payload} {headsCount} from {CoinFlipMessage.HeadsTopic}."); 
      File.AppendAllText(@"heads.txt", payload + Environment.NewLine); 
     } 

     private static void HandleTails(CoinFlipMessage message) 
     { 
      if (message == null) return; 
      tailsCount++; 
      var payload = message.Payload; 
      Console.WriteLine($"Received {payload} {tailsCount} from {CoinFlipMessage.TailsTopic}."); 
      File.AppendAllText(@"tails.txt", payload + Environment.NewLine); 
     } 

     private static int headsCount; 
     private static int tailsCount; 
    } 
} 

メッセージ:

using System;

namespace Messages 
{ 
    public class CoinFlipMessage 
    { 
     public string Payload { get; set; } 

     public static string HeadsTopic = "TopicHeads"; 
     public static string TailsTopic = "TopicTails"; 
    } 
} 
関連する問題