RabbitMQを使用して1つの要求と1つの応答キューを使用して分散RPCのようなソリューションを実装しようとしています。 Apache Apolloと私はRabbitMQに移行できるようにしたいと思っていました。ここで重要なポイントは以下のとおりです。RabbitMQ:ルーティングを使用してメッセージを選択する
- 各サーバは、要求キューに接続しているアポロのための私の実装では(ヘッダフィールド)
を彼のためにすることになっている
問題を再現するためにRoutingサンプル(http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html)を修正しました。私は、routingKeyを定義するさまざまなパラメータとコンシューマのいずれかのメッセージを生成するプロデューサから開始できる2つのコンシューマを持っています。私が見ている行動は、メッセージの消費がランダムであるように見えるということです(メッセージは「John」が初めて「John」の消費者によって処理され、消費者によって「Mary」が2回目に処理されます)。
誰にもRabbitMQでセレクタを使用する際の表示やコードスニペットは?
public static void Main(String[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
const String request = "request";
channel.ExchangeDeclare(request, "direct");
channel.QueueDeclare(request, true, false, false, null);
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
var myRoutingKey = args[0];
channel.QueueBind(request, request, myRoutingKey);
Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};
channel.BasicConsume(request, true, consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
とプロデューサーのために:事前に
public static void Main(String[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
const String request = "request";
channel.ExchangeDeclare(request, "direct");
channel.QueueDeclare(request, true, false, false, null);
var routingKey = args.Length > 0 ? args[0] : "John";
const String message = "Hi";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(request, routingKey, null, body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
おかげで、消費者のための私のコードの下
。
こんにちはアミン、あなたの答えに感謝します。また、動的に生成されたキューを使用すると問題が発生しないこともわかりました。とにかく、パフォーマンス上の懸念から、また以前に他のブローカーと一緒に実装していたソリューションがそのようなものだったので、リクエストと応答キューの両方で解決策を探していました。後者は問題ではなく、いくつかの再設計が必要ですが、パフォーマンスを確認する必要があります。 – Leon
また、動的に生成されたキューを使用することによって、ルーティングコンシューマが実行されていない間に生成されたプロデューサからのすべてのメッセージが失われます。私はまた交換を耐久性あると宣言しようとしましたが、それは助けにはなりません。 – Leon
動的に生成されたキューは、私が想定しているキューの名前を失うため、基本的には一時的なキューになります。ダイナミックキュー名は、一貫性のある名前を使用するだけの解決策になるとは思わない(実際にセレクタとして考える)。このようにしても、パフォーマンスは低下しません。これはRabbitMQが設計したものです。 –