2016-07-22 16 views
0

Azure Service Busで設定したトピックから受信したメッセージをワーカーロールを使用して処理するクラウドサービスがあります。Azureトピック作業者ロールは60秒後にメッセージの処理を停止します

メッセージ自体は到着していないようで、通常は正しく受信され、正しく処理されます。ただし、メッセージが処理を停止するように見える場合があります(ログが突然終了し、処理中のメッセージへの参照がWadLogsTableには表示されません)。私の研究では、これは、接続を開いてアイドル状態を数秒間保持し続ける作業者の役割のために起こっている可能性があります。これらの長いメッセージを放棄しないようにするにはどうしたらいいですか?

私たちのワーカーロールのコードは以下のとおりです。

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallData> _callDataRepository; 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 
    private BaseRepository<Site> _siteRepository; 

    public override void Run() 
    { 
     try 
     { 
      List<CallInformation> callInfo; 
      Trace.WriteLine("Starting processing of messages"); 

      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 

      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message "); 
       message.RenewLock(); 
       callInfo = message.GetBody<List<CallInformation>>(); 
       writeCallData(callInfo); 


       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
    } 

    private void writeCallData(List<CallInformation> callList) 
    { 
     try 
     { 
      Trace.TraceInformation("Calls received: " + callList.Count); 
      foreach (var callInfo in callList) 
      { 
       Trace.TraceInformation("Unwrapping call..."); 
       var call = callInfo.CallLog.Unwrap(); 
       Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
       Trace.TraceInformation("Inserting Call..."); 
       _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
        Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
        var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
        Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
        foreach (var data in datapoints) 
        { 
         /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
        } 
        Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
       Trace.TraceInformation("Call Processed successfully."); 
      } 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Call Processing Failed. " + e.Message); 
     } 
    } 

    public override bool OnStart() 
    { 
     try 
     { 
      var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
      _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
      _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
      var topic = new TopicDescription("MyTopic") 
      { 
       DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
       DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
       RequiresDuplicateDetection = true, 
      }; 
      if (!_nManager.TopicExists("MyTopic")) 
      { 
       _nManager.CreateTopic(topic); 
      } 
      if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
      { 
       _nManager.CreateSubscription("MyTopic", "AllMessages"); 
      } 
      _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
       ReceiveMode.ReceiveAndDelete); 
      _options = new OnMessageOptions 
      { 
        AutoRenewTimeout = TimeSpan.FromMinutes(5), 

      }; 
      _options.ExceptionReceived += LogErrors; 
      CreateKernel(); 

      _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
     } 
     return base.OnStart(); 
    } 

    public override void OnStop() 
    { 
     // Close the connection to Service Bus Queue 
     _client.Close(); 
     _completedEvent.Set(); 
    } 

    void LogErrors(object sender, ExceptionReceivedEventArgs e) 
    { 
     if (e.Exception != null) 
     { 
      Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
      _client.Close(); 
     } 
    } 

    public IKernel CreateKernel() 
    { 
     _kernel = new StandardKernel(); 
     /*SNIP: Bind NInjectable repositories */ 
     return _kernel; 
    } 
} 
+0

実行が終了すると、私はワーカーロールが数秒待ってからOnStartを再度呼び出してRun()を再入力することを確認します。 –

答えて

1

Dudeの応答は正解に非常に近いです! runメソッドがすぐに戻る代わりに生き続ける必要があることが正しいことが分かります。しかし、Azure Service Busのメッセージポンプ機構では_client.onMessage(...)をwhileループの中に置くことはできません。これによりエラーが発生します(メッセージポンプはすでに初期化されています)。

実際に何が起こる必要があるかは、ワーカーロールが実行を開始する前に手動リセットイベントを作成してから、メッセージポンプコードを実行した後に待機する必要があることです。 ManualResetEventのドキュメントについては、https://msdn.microsoft.com/en-us/library/system.threading.manualresetevent(v=vs.110).aspxを参照してください。さらに、プロセスはここで説明されていますhttp://www.acousticguitar.pro/questions/607359/using-queueclient-onmessage-in-an-azure-worker-role

私の最後のWorkerロールクラスは次のようになります。

あなたはManualResetEventとWAITONEの呼び出しの存在に気付くでしょう

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private MessagingFactory _mFact; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 

    public override void Run() 
    { 
     ManualResetEvent CompletedEvent = new ManualResetEvent(false); 
     try 
     { 
      CallInformation callInfo; 
      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 
      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message " + message.MessageId); 
       callInfo = message.GetBody<CallInformation>(); 
       WriteCallData(callInfo); 

       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
     CompletedEvent.WaitOne(); 
    } 

private void writeCallData(List<CallInformation> callList) 
{ 
    try 
    { 
     Trace.TraceInformation("Calls received: " + callList.Count); 
     foreach (var callInfo in callList) 
     { 
      Trace.TraceInformation("Unwrapping call..."); 
      var call = callInfo.CallLog.Unwrap(); 
      Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
      Trace.TraceInformation("Inserting Call..."); 
      _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
       Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
       var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
       Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
       foreach (var data in datapoints) 
       { 
        /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
       } 
       Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
      Trace.TraceInformation("Call Processed successfully."); 
     } 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Call Processing Failed. " + e.Message); 
    } 
} 

public override bool OnStart() 
{ 
    try 
    { 
     var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
     _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
     _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
     var topic = new TopicDescription("MyTopic") 
     { 
      DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
      DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
      RequiresDuplicateDetection = true, 
     }; 
     if (!_nManager.TopicExists("MyTopic")) 
     { 
      _nManager.CreateTopic(topic); 
     } 
     if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
     { 
      _nManager.CreateSubscription("MyTopic", "AllMessages"); 
     } 
     _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
      ReceiveMode.ReceiveAndDelete); 
     _options = new OnMessageOptions 
     { 
       AutoRenewTimeout = TimeSpan.FromMinutes(5), 

     }; 
     _options.ExceptionReceived += LogErrors; 
     CreateKernel(); 

     _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
    } 
    return base.OnStart(); 
} 

public override void OnStop() 
{ 
    // Close the connection to Service Bus Queue 
    _client.Close(); 
    _completedEvent.Set(); 
} 

void LogErrors(object sender, ExceptionReceivedEventArgs e) 
{ 
    if (e.Exception != null) 
    { 
     Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
     _client.Close(); 
    } 
} 

public IKernel CreateKernel() 
{ 
    _kernel = new StandardKernel(); 
    /*SNIP: Bind NInjectable repositories */ 
    return _kernel; 
} 

}()私のRunメソッドの終わりに。私は誰かがこれが役立つことを願っています!

1

Runメソッドは無期限に実行されません。 docsから撮影

public override void Run() 
{ 
    try 
    { 
     Trace.WriteLine("WorkerRole entrypoint called", "Information"); 
     while (true) 
     { 
     // Add code here that runs in the role instance 
     } 

    } 
    catch (Exception e) 
    { 
     Trace.WriteLine("Exception during Run: " + e.ToString()); 
     // Take other action as needed. 
    } 
} 

実行がアプリケーションのための主な方法であると考えられているそれは次のようになります。 をオーバーライドします。Runメソッドは必須ではありません。デフォルト実装は決して を返します。 Runメソッドをオーバーライドすると、コードでは が無期限にブロックされます。 Runメソッドが返された場合、ロールは がオフラインになる前にシャットダウンシーケンスが実行されるように、Stopイベントを発生させてOnStopメソッド を呼び出すことによって自動的に がリサイクルされます。

+0

こんにちは、この回答者のおかげです。それは私が実際に起こる必要があったものにとても近くなった。間違いなく価値がありますが、私の回答で説明します。 –

関連する問題