2013-11-21 13 views
6

私はエラーキューを処理することなく、再試行制限を超えた直後に、失敗したメッセージをログする良い解決策を探しています。私がこれまでに見つけた何を:私はInMemoryInboundMessageTrackerから継承してオーバーライドすることができますmasstransitで失敗したメッセージを記録するには?

  • をIsRetryLimitExceededが、そこにこの時点IDを除くメッセージそのものについての情報はありませんで。
  • 私はIInboundMessageInterceptorを実装し、プリ/ PostDispatchIConsumeContextを得るが、そこにこの時点で成功に関する情報/失敗することはできません。

だから、解決策として、私はをpreDispatchIConsumeContextを得ることができますキャッシュのいくつかの並べ替えに入れ、その後におけるキャッシュからそれを取得し、再試行制限を超えたとき IsRetryLimitExceeded。

方法は、このような順序で呼び出され:IsRetryLimitExceeded - >をpreDispatch - > PostDispatch

だから私はキャッシュから正常に処理されたメッセージを削除するのに適した場所を見つけることができません。

もちろん、制限されたサイズのキャッシュを使用できますが、この全体的な解決策は奇妙なようです。

この問題に関するご意見はありがとうございます。

答えて

2

独自のメッセージ再試行追跡をバスに実装して構成することができます。その結果、失敗したメッセージは実装に渡されます。デフォルトの再試行トラッカーに委任し、イベントを傍受して対処することも、必要に応じて独自の再試行トラッキングを実装することもできます。

MessageTrackerFactoryは設定用の代理人ですが、私はこのインターフェースが近くにあると思います。

+0

うんでこれらのクラスをプラグインする、それがIsRetryLimitExceededをオーバーライド/実装の場合です。 – amstix

+0

_IInboundMessageTracker_インターフェイスのものです。メッセージに関する情報はなく、messageId以外のコンテキストを消費しますが、キャッシュからIConsumeContextを削除するために使用できる便利なメソッド_MessageWasReceivedSuccessfully_と_MessageWasMovedToErrorQueue_がいくつかあります。 – amstix

4

私はこの解決策をedndedました:

class MessageInterceptor: IInboundMessageInterceptor 
{ 
    public void PreDispatch(IConsumeContext context) 
    { 
     MessageTracker.Register(context); 
    } 

    public void PostDispatch(IConsumeContext context) 
    {} 
} 

class MessageTracker: InMemoryInboundMessageTracker 
{ 
    readonly Logger logger; 

    static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>(); 

    public MessageTracker(int retryLimit, Logger logger) 
     : base(retryLimit) 
    { 
     this.logger = logger; 
    } 

    public static void Register(IConsumeContext context) 
    { 
     DispatchingCache.GetOrAdd(context.MessageId, context); 
    } 

    public override void MessageWasReceivedSuccessfully(string id) 
    { 
     base.MessageWasReceivedSuccessfully(id); 

     IConsumeContext value; 
     DispatchingCache.TryRemove(id, out value); 
    } 

    public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions) 
    { 
     var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions); 

     IConsumeContext failed; 
     if (!result || !DispatchingCache.TryRemove(id, out failed)) 
      return result; 

     // --> log failed IConsumeContext with exception 

     return true; 
    } 
} 

そして

 serviceBus = ServiceBusFactory.New(config => 
     { 
      ... 
      config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb => 
      { 
       var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline); 
       interceptorConfig.Create(new MessageInterceptor()); 
      })); 

      config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger())); 
     }); 
関連する問題