同じメッセージを受信するように2つのサービスが設定されています。 ConsumerA : IConsumer<IMessageA>
およびConsumerB : IConsumer<IMessageA>
。各サービスは固有のエンドポイントを設定します。 queue_a
およびqueue_b
を入力し、そのコンシューマを登録します。私はRabbitMQでファンアウトタイプのIMessageAを交換して、queue_a
とqueue_b
に結びついているのを見ています。ここまでは順調ですね。MassTransit:複数のコンシューマー、別々のキュー/エンドポイント、配信されないメッセージ
- 私は両方のサービスを実行してメッセージを公開しますが、サービスAだけがそれを取得します。
- サービスAを停止し、RabbitMQのBにメッセージを手動で公開します(サービスAは
IMessageA
をIRequestClient<IMessageA, IMessageAResponse>
を使用してPOSTに応答して公開するWebサービスです。なぜ私は手動で投稿する必要があるのですか)期待どおりに消費します。
明確にするサービスAを停止して、メッセージがqueue_a
とqueue_b
両方にのRabbitMQによってルーティングされます。サービスAが実行されている場合は、メッセージはqueue_b
がIMessageA
交流にバインドされていると絶対にそれらを取得する必要があることを示す既存の交換のバインディングにもかかわらず、queue_b
に行きます。または、管理用ウェブUIを通じてRabbitMQを調べることができる頃には、queue_b
にメッセージが届いたという証拠はありません(すなわち、queue_b_error
またはqueue_b_skipped
には何も存在しません。
サービスAとBの両方にIReceiveObserver
を追加しましたが、何も起動していないのはReceiveFault
またはConsumeFault
です。
サービスAでの消費者は基本的にやっている:
はvar result = await MethodThatReturnsIMessageAResponse(messageA);
context.Respond(result);
なぜサービスは、サービスBへのメッセージの配信を妨害していますか?どこから見始めますか?特定enpoint(例えばキュー名)を必要と
c.Resolve<IBus>().CreateRequestClient<IToDoMessage, IToDoMessageResponse>(new Uri(QueueAddress + QueueName),TimeSpan.FromSeconds(5));
を:
バインディングを確認しましたか?これは常に機能しますが、これは基本的な機能です。このAサービスで例外をスローして、メッセージがエラーキューに送られるかどうかを確認しましたか? –
また、サービスAが停止してメッセージを公開すると、キューに来ると言います。サービスを開始すると、これらのメッセージはどうなりますか?それらはキューから消え、消費されたことを意味しますか? –
サービスAが停止しているときに、RabbitMQの 'IMessageA'エクスチェンジに直接メッセージを公開すると、' queue_a'と 'queue_b'の両方に配信され、サービスBはメッセージを受信して消費することができます。その後、サービスAを起動すると、メッセージは処理され、消費されます。これは、サービスAが実行されていて、メッセージが 'queue_b'に到達していないときだけです。あなたは私の質問の最初の段落にチェックして、バインディングが正しいと思われることがわかります。なぜそれが問題なのかを迷っています。 –