0

私は現在Internet Of Thingsに取り組んでいます。私の現在のプロジェクトでは、私はWorker Roleを作成したことでOne Azure Cloud Serviceプロジェクトを作成しました。私は以下のコード行を書きました。ProcessEventsync(PartitionContextコンテキスト、ienumerable <EventData>メッセージ)メソッドが実行されたとき

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

    private static string connectionString; 
    private static string eventHubName; 
    public static ServiceClient iotHubServiceClient { get; private set; } 
    public static EventHubClient eventHubClient { get; private set; } 

    public override void Run() 
    { 
     Trace.TraceInformation("EventsForwarding Run()...\n"); 

     try 
     { 
      this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      this.runCompleteEvent.Set(); 
     } 
    } 

    public override bool OnStart() 
    { 
     // Set the maximum number of concurrent connections 
     ServicePointManager.DefaultConnectionLimit = 12; 

     // For information on handling configuration changes 
     // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 

     bool result = base.OnStart(); 

     Trace.TraceInformation("EventsForwarding OnStart()...\n"); 

     connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; 
     eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"]; 

     string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"]; 
     string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"]; 
     string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", 
      storageAccountName, storageAccountKey); 

     string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"]; 
     iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString); 
     eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); 

     var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); 

     string eventProcessorHostName = "SensorEventProcessor"; 
     EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString); 
     eventProcessorHost.RegisterEventProcessorAsync<SensorEventProcessor>().Wait(); 

     Trace.TraceInformation("Receiving events...\n"); 

     return result; 
    } 

    public override void OnStop() 
    { 
     Trace.TraceInformation("EventsForwarding is OnStop()..."); 

     this.cancellationTokenSource.Cancel(); 
     this.runCompleteEvent.WaitOne(); 

     base.OnStop(); 

     Trace.TraceInformation("EventsForwarding has stopped"); 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      //Trace.TraceInformation("EventsToCommmandsService running...\n"); 
      await Task.Delay(1000); 

     } 
    } 
} 

次の私は、イベントのハブからメッセージを受信したIoTハブにそれらのメッセージを送信するために、SensorEventProcessorのコードの以下の行を書いています。私は自分のコードをデバッグした時

class SensorEventProcessor : IEventProcessor 
{ 
    Stopwatch checkpointStopWatch; 
    PartitionContext partitionContext; 

    public async Task CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     Trace.TraceInformation(string.Format("EventProcessor Shuting Down. Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString())); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
    } 

    public Task OpenAsync(PartitionContext context) 
    { 
     Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset)); 
     this.partitionContext = context; 
     this.checkpointStopWatch = new Stopwatch(); 
     this.checkpointStopWatch.Start(); 
     return Task.FromResult<object>(null); 
    } 

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     Trace.TraceInformation("\n"); 
     Trace.TraceInformation("........ProcessEventsAsync........"); 
     //string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}"; 
     //await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew))); 
     foreach (EventData eventData in messages) 
     { 
      try 
      { 
       string jsonString = Encoding.UTF8.GetString(eventData.GetBytes()); 

       Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'", 
        eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId)); 

       Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString)); 

       SimpleTemperatureAlertData newSensorEvent = this.DeserializeEventData(jsonString); 

       Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'", 
        newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId)); 

       // Issuing alarm to device. 
       string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}"; 
       Trace.TraceInformation("Issuing alarm to device: '{0}', from sensor: '{1}'", newSensorEvent.DeviceId, newSensorEvent.RoomTemp); 
       Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew); 
       await WorkerRole.iotHubServiceClient.SendAsync(newSensorEvent.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew))); 
      } 
      catch (Exception ex) 
      { 
       Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message); 
      } 
     } 

     await context.CheckpointAsync(); 
    } 
    private SimpleTemperatureAlertData DeserializeEventData(string eventDataString) 
    { 
     return JsonConvert.DeserializeObject<SimpleTemperatureAlertData>(eventDataString); 
    } 

} 

、ProcessEventsAsync(PartitionContextコンテキスト、IEnumerableをメッセージ)メソッドを呼び出していないとただのデバッグitstopその後、openAsync()メソッドのどちらに入るん。

教えてください私のプロジェクトで間違いを犯して、ProcessEventsAsync()メソッドがいつ呼び出されるか教えてください。 EventHubで未処理のメッセージがあるとき

よろしく、

プラディープ

答えて

1

IEventProcessor.ProcessEventsAsyncが呼び出されます。

イベントハブには複数のパーティションが含まれています。パーティションは、順序付けられた一連のイベントです。パーティション内では、各イベントにオフセットが含まれます。このオフセットは、コンシューマ(IEventProcessor)によって、指定されたパーティションのイベントシーケンス内の位置を示すために使用されます。 IEventProcessorが接続すると(EventProcessorHost.RegisterEventProcessorAsync)、このオフセットがイベントハブに渡され、読み込みを開始する場所が指定されます。未処理のメッセージ(オフセットの大きいイベント)があると、それらはIEventProcessorに渡されます。チェックポイントは、処理されたメッセージのオフセットを保持するために使用されます(PartitionContext.CheckpointAsync)。

あなたはEventHubの内部に関する詳細な情報を見つけることができます:Azure Event Hubs overview

あなたはEventHub(EventHubClient.SendAsync(EventDataの))にメッセージを送ったことがありますか?

+0

はい、SendAsync()メソッドを使用してイベントハブにメッセージを送信しません。私の要件では、ストリームアナライザのイベントハブとして出力ジョブを設定しました。ストリームアナライザは、このイベントハブに値を送信し、IoTハブとして入力を使用しました。 – pradeep

+0

Azure Portalを介して、最近EventHubが受け取ったメッセージがあることを確認できますか? –

+0

私はそれが受信されたと思う私の青空のポータルダッシュボードでは、イベントハブのメッセージを表示されません。 – pradeep

関連する問題