2017-02-20 15 views
0

の問題が発生しました。ここでは、書き込み量が0に達してから通常のレベルに達したときに新しいインスタンスIMessageSessionAsyncHandlerが作成されません。AzureサービスバスSessionHandlerのパーティション化されたキューの問題

より正確には、MaxConcurrentSessionsの値がSessionHandlerOptionsです。これにより、1k msg/s以上の速度で読み取ることができます。 私が読むキューはで、キューはです。 キュー内のメッセージの量はかなり一定ですが、時々0になります。ボリュームが通常のレベルに戻ると、SessionFactoryはハンドラを生成しないため、メッセージをもう読み取ることができません。それはセッションが正しくリサイクルされなかったか、または一種の連続した待っているようなものです。

private void RegisterHandler() 
    { 
     var sessionHandlerOptions = new SessionHandlerOptions 
     { 
      AutoRenewTimeout = TimeSpan.FromMinutes(1), 
      MessageWaitTimeout = TimeSpan.FromSeconds(1), 
      MaxConcurrentSessions = 500 
     }; 
     _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions); 
    } 

ファクトリクラス:

public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandlerFactory(Action<BrokeredMessage> callback) 
    { 
     _callback = callback; 
    } 

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message) 
    { 
     return new SessionHandler(session.SessionId, _callback); 
    } 

    public void DisposeInstance(IMessageSessionAsyncHandler handler) 
    { 
     var disposable = handler as IDisposable; 

     disposable?.Dispose(); 
    } 
} 

ハンドラ:ここ

が登録工場のためのコードである

public class SessionHandler : MessageSessionAsyncHandler 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandler(string sessionId, Action<BrokeredMessage> callback) 
    { 
     SessionId = sessionId; 
     _callback = callback; 
    } 

    public string SessionId { get; } 

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message) 
    { 
     try 
     { 
      _callback(message); 
     } 
     catch (Exception ex) 
     { 
      Logger.Error(...); 
     } 
    } 

私は、セッションハンドラがあることがわかります書込み/読取りが正常レベルにあるときに工場が廃棄されることを保証する。ただし、キューが空になると、新しいセッションハンドラが作成されることはありません。一定期間使用しないと同じセッションを再割り当てすることを禁止するセッションIDを割り当てるポリシーはありますか?

編集1: 私は行動を説明するために二つの絵を追加している:作家を停止して再起動する enter image description here

、実行している読者は同じくらい前に読み取ることができません。

その瞬間後に作成されたセッションの数も以前よりはるかに低いです: enter image description here

答えて

0

キュー内のメッセージのボリュームはかなり一定であるが、時間から時間には0ときに降りますボリュームが正常なレベルに戻ったとき、SessionFactoryはハンドラを生成しないので、私はもはやメッセージを読むことができません。それはセッションが正しくリサイクルされなかったか、または一種の連続した待っているようなものです。

IMessageSessionAsyncHandlerインスタンスが作成される方法を制御するためにIMessageSessionHandlerFactoryを使用して、あなたのIMessageSessionAsyncHandlerのすべてのインスタンスの作成と破棄を記録しようとすることができます。

コードに基づいて、この問題のコンソールアプリケーションを作成しました。ここでは、キュークライアントを初期化し、メッセージを処理するための私のコードスニペットは、次のとおりです。私のテストパー

static void InitializeReceiver(string connectionString, string queuePath) 
{ 
    _queueClient = QueueClient.CreateFromConnectionString(connectionString, queuePath, ReceiveMode.PeekLock); 

    var sessionHandlerOptions = new SessionHandlerOptions 
    { 
     AutoRenewTimeout = TimeSpan.FromMinutes(1), 
     MessageWaitTimeout = TimeSpan.FromSeconds(5), 
     MaxConcurrentSessions = 500 
    }; 
    _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(OnMessageHandler), sessionHandlerOptions); 
} 

OnMessageHandler

static void OnMessageHandler(BrokeredMessage message) 
{ 
    var body = message.GetBody<Stream>(); 

    dynamic recipeStep = JsonConvert.DeserializeObject(new StreamReader(body, true).ReadToEnd()); 
    lock (Console.Out) 
    { 
     Console.ForegroundColor = ConsoleColor.Cyan; 
     Console.WriteLine(
      "Message received: \n\tSessionId = {0}, \n\tMessageId = {1}, \n\tSequenceNumber = {2}," + 
      "\n\tContent: [ title = {3} ]", 
      message.SessionId, 
      message.MessageId, 
      message.SequenceNumber, 
      recipeStep.title); 
     Console.ResetColor(); 
    } 
    Task.Delay(TimeSpan.FromSeconds(3)).Wait(); 
    message.Complete(); 
} 

InitializeReceiver

予想通り、SessionHandlerは時に音量仕事ができますキュー内のメッセージの正常からゼロへ、0から正常へのメッセージを次のようにして返します。

また、私は、この問題をテストするためにQueueClient.RegisterSessionHandlerAsyncを活用しようとしましたが、それは同様に動作します。さらに、私は約Service Bus Sessionsこのgitのサンプルを見つけた、あなたはそれを参照することができます。

+0

ありがとうBruce、私は振る舞いを説明するために質問に2つの画像を追加しました。 –

関連する問題