の問題が発生しました。ここでは、書き込み量が0に達してから通常のレベルに達したときに新しいインスタンスIMessageSessionAsyncHandler
が作成されません。AzureサービスバスSessionHandlerのパーティション化されたキューの問題
より正確には、MaxConcurrentSessions
の値がSessionHandlerOptions
です。これにより、1k msg/s以上の速度で読み取ることができます。 私が読むキューはで、キューはです。 キュー内のメッセージの量はかなり一定ですが、時々0になります。ボリュームが通常のレベルに戻ると、SessionFactoryはハンドラを生成しないため、メッセージをもう読み取ることができません。それはセッションが正しくリサイクルされなかったか、または一種の連続した待っているようなものです。
private void RegisterHandler()
{
var sessionHandlerOptions = new SessionHandlerOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(1),
MessageWaitTimeout = TimeSpan.FromSeconds(1),
MaxConcurrentSessions = 500
};
_queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions);
}
ファクトリクラス:
public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandlerFactory(Action<BrokeredMessage> callback)
{
_callback = callback;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new SessionHandler(session.SessionId, _callback);
}
public void DisposeInstance(IMessageSessionAsyncHandler handler)
{
var disposable = handler as IDisposable;
disposable?.Dispose();
}
}
ハンドラ:ここ
が登録工場のためのコードである
public class SessionHandler : MessageSessionAsyncHandler
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandler(string sessionId, Action<BrokeredMessage> callback)
{
SessionId = sessionId;
_callback = callback;
}
public string SessionId { get; }
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
try
{
_callback(message);
}
catch (Exception ex)
{
Logger.Error(...);
}
}
私は、セッションハンドラがあることがわかります書込み/読取りが正常レベルにあるときに工場が廃棄されることを保証する。ただし、キューが空になると、新しいセッションハンドラが作成されることはありません。一定期間使用しないと同じセッションを再割り当てすることを禁止するセッションIDを割り当てるポリシーはありますか?
編集1: 私は行動を説明するために二つの絵を追加している:作家を停止して再起動する
、実行している読者は同じくらい前に読み取ることができません。
その瞬間後に作成されたセッションの数も以前よりはるかに低いです:
ありがとうBruce、私は振る舞いを説明するために質問に2つの画像を追加しました。 –