2016-11-30 10 views
1

メッセージをエラーキューから元のキューに戻そうとしています。 これを行うには、エラーキューにコンシューマを作成し、それを必要なキューに公開しました。 これを試してみると、Consumed Messagesの半分が公開されますが、残りの半分はError_Skipped Queueに送信されます。MassTransit RabbitMQエラーキューの消費メッセージの半分をError_Skippedキューに移動する

私は多くのことを成功させずに試してきました。だから、おそらく私が紛失している単純なものです。

は、ここに私のコードのサンプルです:

public class ClaimsMessage 
{ 
    public string Description { get; set; } 

    public DateTime Date { get; set; } 

    public bool Handled { get; set; } 
} 

public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>> 
{ 
    public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context) 
    { 
     try 
     { 
      await context.Publish<ClaimsMessage>(context.Message.Message); 

     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

public static IBusControl CreateClaimsErrorConsumerBus(string endPoint) 
{ 
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
    { 
     var host = cfg.Host(new Uri("rabbitmq://localhost/"), h => 
     { 
      h.Username("guest"); 
      h.Password("guest"); 
     }); 

     cfg.ReceiveEndpoint(host, endPoint, e => 
     { 
      e.Consumer(() => new ClaimsMessageErrorConsumer()); 
     }); 
    }); 
    return busControl; 
} 
+0

[シャベル?](https://www.rabbitmq.com/shovel.html)を使用したことがありますか – stuartd

+0

私はシャベルを見ました。しかし、少し基本的です。 ルールを追加して、特定のメッセージだけを後で移動し、残りの部分を後で移動できるようにしたいと考えています。 –

+0

[メーリングリスト](https://groups.google.com/forum/#!forum/masstransit-discuss)を試すことがあります – stuartd

答えて

1

あなたが戻って処理キューにエラー・キューからメッセージを移動する場合、あなたはPublishを呼ぶべきではない - これは、すべての加入者にメッセージを再送信します。すでにキュー名がわかっているので、メッセージを直接キューに戻してください。あなたが見ているのは、エラーキューにコンシューマを作成したことです。エラーキューには、そのメッセージの交換バインディングが作成されています。メッセージの忠実性が保持されるように、元のメッセージのヘッダーをコピーし、余分な信用のために

sbc.ReceiveEndpoint("input_error", x => 
{ 
    // this prevents extra message bindings from being created 
    x.BindMessageExchanges = false; 

    x.Consumer<MyMover>(() => new MyMover(inputQueueAddress); 
}); 

public class MyMover : 
    IConsumer<ClaimsMessage> 
{ 
    public async Task Consume(ConsumeContext<ClaimsMessage> context) 
    { 
     try 
     { 
      var endpoint = await context.GetSendEndpoint(_inputQueueAddress); 
      await endpoint.Send<ClaimsMessage>(context.Message); 
     } 
     catch (Exception e) 
     { 
      string error = e.Message; 
     } 
    } 
} 

ので、代わりにこれを行います。

+0

返信いただきありがとうございます。 残念ながら、それは同じことを行い、1つのメッセージは要求キューに置かれ、もう1つのメッセージはClaims_error_skippedキューに入れられます。 私はヘッダーを介してコピーしませんでした。それがそれを引き起こしていると思いますか? –

+0

こんにちはクリス。 私はここでダウンロードできるサンプルプロジェクトを作成しました: https://drive.google.com/file/d/0B0FYiKs0DMyrYTJfZTcxdVlKSDg/view?usp=sharing[link] これは、私がそれをどのように複製したかに関するステップバイステップの説明が含まれています。 私は何かが欠けている可能性がありますが、私は非常に広範にドキュメントを読んでいます。 –

+0

エラーではなく、エラーキューから読み取っている場合は元のメッセージを消費する必要があります。基本的に、障害キューのエラーキューに追加のコンシューマバインディングを作成しました。これは、キューから移動された元のメッセージに加えて、公開されたフォルトを取得することになります。代わりにTを消費し、RMQのバインディングをクリーンアップすると、すべて設定されます。 –

関連する問題