0

私は晴れのサービスバスのトピックを使用しています。私は大きなメッセージを持っているので、私は大きなメッセージを分割していて、それらをsessionidと分割された小さなメッセージで送信しています。私は受信機にイベント駆動アーキテクチャを持たせたい。私は同じセッションIDを持つすべてのメッセージを受信する必要があり、適切な分割order.bellowでそれらを集計する必要が私のコードです。しかし、初めて私は蛇のコードからメッセージを取得しています。 2番目のメッセージでタイムアウトします。Azureサービスバスのトピックイベントドリブンアーキテクチャモデルとのセッションでメッセージを受信

  public class CRMESBListener : RoleEntryPoint 
      { 
       private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
       private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

      public override void Run() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running"); 
       try 
       { 
        DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner(); 
        dbMessageListener.Listen(); 
        runCompleteEvent.WaitOne(); 
        //this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
       } 
       finally 
       { 
        this.runCompleteEvent.Set(); 
       } 
      } 

      public override bool OnStart() 
      { 
       // Set the maximum number of concurrent connections 
       ServicePointManager.DefaultConnectionLimit = 12; 

       // For information on handling configuration changes 
       // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 


       bool result = base.OnStart(); 
       Bootstrapper.Init(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started"); 

       return result; 
      } 

      public override void OnStop() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping"); 

       this.cancellationTokenSource.Cancel(); 
       this.runCompleteEvent.WaitOne(); 

       base.OnStop(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped"); 
      } 

      private async Task RunAsync(CancellationToken cancellationToken) 
      { 
       // TODO: Replace the following with your own logic. 
       while (!cancellationToken.IsCancellationRequested) 
       { 
        Trace.TraceInformation("Working"); 
        await Task.Delay(1000); 
       } 
      } 
     } 













     public class DBMessageListener 
     { 
      #region Member Variables 

      private static DBMessageListener dbMessageListner; 
      private static object lockObject = new object(); 
      private TopicSubscribeClientWrapper accountTopicClient; 

      private NamespaceManager namespaceManager; 
      private OnMessageOptions eventDrivenMessagingOptions; 

      private int crmIntegrationUserID = Common.CrmCurrentUser.UserID; 

      #endregion Member Variables 

      #region Constructors 

      private DBMessageListener() 
      { 
       string subscriptionName = "AllMessages"; 
       namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString); 

       if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName)) 
       { 
        namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName); 
       } 
       accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath); 
       accountTopicClient.SubscriptionName = subscriptionName; 



       eventDrivenMessagingOptions = new OnMessageOptions 
       { 
        AutoComplete = true 
       }; 

       eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived; 
       eventDrivenMessagingOptions.MaxConcurrentCalls = 5; 
      } 

      #endregion Constructors 

      #region Methods 

      private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message) 
      { 
       if (message != null) 
       { 
        try 
        { 
         await ProcessDBMessage(message.GetBody<ServiceBusMessage>()); 
        } 
        catch (Exception ex) 
        { 
         //log exception 
        } 
       } 

      } 

      private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e) 
      { 
       if (e != null && e.Exception != null) 
       { 

       } 
      } 

      private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message) 
      { 

    //process message   
      } 

      public static DBMessageListener GetDBMessageListner() 
      { 
       if (dbMessageListner == null) 
       { 
        lock (lockObject) 
        { 
         if (dbMessageListner == null) 
         { 
          dbMessageListner = new DBMessageListener(); 
         } 
        } 
       } 

       return dbMessageListner; 
      } 

      public void Listen() 
      { 
       accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions); 

      } 

      #endregion Methods 
     } 


public class TopicSubscribeClientWrapper : IServiceBusClientWrapper 
    { 
     #region Member Variables 

     private readonly string _connectionString; 
     private readonly string _topicName; 
     private readonly TopicClient _topicClient; 
     private SubscriptionClient _subscriptionClient; 

     #endregion Member Variables 

     #region Properties 

     public string SubscriptionName { get; set; } 

     #endregion Properties 

     #region Constructors 

     public TopicSubscribeClientWrapper(string connectionString, string topicName) 
     { 
      _connectionString = connectionString; 
      _topicName = topicName; 
      _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName); 
     } 

     #endregion Constructors 

     #region Event Handlers 

     public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions) 
     { 

      _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName); 

      // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

      MemoryStream largeMessageStream = new MemoryStream(); 
      MessageSession session = _subscriptionClient.AcceptMessageSession(); 

      while (true) 
      { 
       BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5)); 

       if (subMessage != null) 
       { 
        Stream subMessageStream = subMessage.GetBody<Stream>(); 
        subMessageStream.CopyTo(largeMessageStream); 

        subMessage.Complete(); 
        //Console.Write("."); 
       } 
       else 
       { 
        //Console.WriteLine("Done!"); 
        break; 
       } 
      } 

      BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true); 
      var message = onMessageCallback.Method.GetParameters(); 
      message.SetValue(largeMessage, 1); 
      _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

     } 

     #endregion Event Handlers 

     #region Methods 

     public Task SendAsync(BrokeredMessage message) 
     { 
      return _topicClient.SendAsync(message); 
     } 

     public void Close() 
     { 
      if (_subscriptionClient != null) 
      { 
       _subscriptionClient.Close(); 
      } 

      _topicClient.Close(); 
     } 

     #endregion Methods 
    } 

答えて

1

私は別のルートをとることをお勧めします。大量のメッセージを送信するメッセージのセッションを作成するのではなく、特にこの問題に対処しているclaim check pattern(大きな添付ファイル)を使用してください。ストレージBLOBにデータを書き込み、そのURIと共にメッセージを送信します。ペイロードをチャンクで送るのではなく、ブロブを保存/復元するほうがはるかに簡単です。また、この方法では、システムを監視するのが簡単です(1つまたは複数のブロブに関連付けられた成功/失敗メッセージが1つ失敗しました)セッションや何か特別なものを使わなければなりません。

関連する問題