2017-02-27 4 views
0

同じメッセージを受信するように2つのサービスが設定されています。 ConsumerA : IConsumer<IMessageA>およびConsumerB : IConsumer<IMessageA>。各サービスは固有のエンドポイントを設定します。 queue_aおよびqueue_bを入力し、そのコンシューマを登録します。私はRabbitMQでファンアウトタイプのIMessageAを交換して、queue_aqueue_bに結びついているのを見ています。ここまでは順調ですね。MassTransit:複数のコンシューマー、別々のキュー/エンドポイント、配信されないメッセージ

  • 私は両方のサービスを実行してメッセージを公開しますが、サービスAだけがそれを取得します。
  • サービスAを停止し、RabbitMQのBにメッセージを手動で公開します(サービスAはIMessageAIRequestClient<IMessageA, IMessageAResponse>を使用してPOSTに応答して公開するWebサービスです。なぜ私は手動で投稿する必要があるのですか)期待どおりに消費します。

明確にするサービスAを停止して、メッセージがqueue_aqueue_b両方にのRabbitMQによってルーティングされます。サービスAが実行されている場合は、メッセージqueue_bIMessageA交流にバインドされていると絶対にそれらを取得する必要があることを示す既存の交換のバインディングにもかかわらず、queue_bに行きます。または、管理用ウェブUIを通じてRabbitMQを調べることができる頃には、queue_bにメッセージが届いたという証拠はありません(すなわち、queue_b_errorまたはqueue_b_skippedには何も存在しません。

サービスAとBの両方にIReceiveObserverを追加しましたが、何も起動していないのはReceiveFaultまたはConsumeFaultです。

サービスAでの消費者は基本的にやっている:

var result = await MethodThatReturnsIMessageAResponse(messageA); 
context.Respond(result); 

なぜサービスは、サービスBへのメッセージの配信を妨害していますか?どこから見始めますか?特定enpoint(例えばキュー名)を必要と

c.Resolve<IBus>().CreateRequestClient<IToDoMessage, IToDoMessageResponse>(new Uri(QueueAddress + QueueName),TimeSpan.FromSeconds(5)); 

を:

+0

バインディングを確認しましたか?これは常に機能しますが、これは基本的な機能です。このAサービスで例外をスローして、メッセージがエラーキューに送られるかどうかを確認しましたか? –

+0

また、サービスAが停止してメッセージを公開すると、キューに来ると言います。サービスを開始すると、これらのメッセージはどうなりますか?それらはキューから消え、消費されたことを意味しますか? –

+0

サービスAが停止しているときに、RabbitMQの 'IMessageA'エクスチェンジに直接メッセージを公開すると、' queue_a'と 'queue_b'の両方に配信され、サービスBはメッセージを受信して​​消費することができます。その後、サービスAを起動すると、メッセージは処理され、消費されます。これは、サービスAが実行されていて、メッセージが 'queue_b'に到達していないときだけです。あなたは私の質問の最初の段落にチェックして、バインディングが正しいと思われることがわかります。なぜそれが問題なのかを迷っています。 –

答えて

0

問題は、私が使用して「出版」のメッセージだったということでした。代わりに、私はCreatePublishRequestClientを使用するために必要な:

ない特定のキューに、交流を通じて公開するバスを使用し、行く
c.Resolve<IBus>().CreatePublishRequestClient<IToDoMessage, IToDoMessageResponse>(TimeSpan.FromSeconds(5)); 

GitHub sample projectが前者を示すのに役立ちません...

関連する問題