私はC#/ .NETのRabbitMQでトピックベースルーティングの概念証明を行っています(EasyNetQライブラリを使用)。私のテストでは、トピックルーティング( "TopicA"と "TopicB")によって2つの耐久性のあるキューにバインドされた単一のエクスチェンジがあります。ここEasyNetQを使用して複数のRabbitMQキューを購読する
生産(C#のコンソールアプリケーション)のためのコードである:ここ
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
Random random = new Random();
Foo foo; // my test message class
for (int i = 0; i < 100; i++)
{
int coin = random.Next(0, 2);
if (coin == 0)
{
foo = new Foo() { Payload = "Heads" };
bus.Publish(foo, "TopicA");
Console.WriteLine($"Published message {i} to TopicA.");
}
else
{
foo = new Foo() { Payload = "Tails" };
bus.Publish(foo, "TopicB");
Console.WriteLine($"Published message {i} to TopicB.");
}
}
}
は自分の消費者コード(また、C#のコンソールアプリケーション)である:
class Program
{
static void Main(string[] args)
{
TestRabbitMQSubscribe();
Console.ReadKey(false);
}
private static void TestRabbitMQSubscribe()
{
using (var bus = RabbitHutch.CreateBus("host=xxx;username=xxx;password=xxx"))
{
bus.Subscribe<Foo>("TopicA", HandleFooA, config => config.WithTopic("TopicA"));
bus.Subscribe<Foo>("TopicB", HandleFooB, config => config.WithTopic("TopicB"));
}
}
private static void HandleFooA(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicA.");
File.AppendAllText(@"c:\heads.txt", foo.Payload + Environment.NewLine);
}
private static void HandleFooB(Foo foo)
{
Console.WriteLine($"Received {foo.Payload} from TopicB.");
File.AppendAllText(@"c:\tails.txt", foo.Payload + Environment.NewLine);
}
}
プロデューサコードがで実行されTopicBキューには53のメッセージが含まれていて、TopicAキューには47のメッセージが含まれていることをRabbitMQ管理者UIを使用して確認できます。
次に、コンシューマーコードを実行すると、TopicAキュー内のすべてのメッセージをプルして、適切なファイルに書き込むように見えます。しかし、それはTopicBキュー内のほんの一握りのメッセージを受信するか、まったく受信しません。その後、Console.ReadKey()
コールで停止します。
bus.Subscribe()
コールの順序を逆にすると、TopicBではなくTopicBからのメッセージが引き出されます。
私は単純なもの(例えばブロッキングコール)が欠落しているか、根本的にいくつかのRabbitMQまたはEasyNetQの概念を誤解しているように感じます。
メッセージがすべて生成されるとすぐに、プロデューサの 'using'ブロックが終了するように見えます。これにより、待ち行列が死ぬことができます。 –
これで運がいい? –
@MarkLarter、私はそれを再訪する機会はなかったが、あなたの答えに感謝する - それは私が調査する最初の道だ。 –