2016-10-06 8 views
1

私は初心者で、問題があります。受信者イベントハブ内の個々のメッセージを置き換えることはできません。どのように私はこれを修正する方法を知っていますか?ReceiverイベントハブのEvenDataメッセージの編集方法は?

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 


      try 
     { 
      foreach (EventData message in messages) 
      { 
        string data = Encoding.UTF8.GetString(message.GetBytes()); 
        NewClient Client = JsonConvert.DeserializeObject<NewClient>(data); 
        GPSApi Gpsobject = new GPSApi(); 
        Gpsobject.GetJson(Client.City, Client.Street); 
        Gpsobject.DeserializeJson(); 
        Gpsobject.ConvertJson(); 
        WeatherApi WeatherApiobject = new WeatherApi(); 
        WeatherApiobject.GetJson(Gpsobject.convertlat, Gpsobject.convertlng); 
        data = WeatherApiobject.sendEvent; 
        EventData data1 = new EventData(Encoding.UTF8.GetBytes(data)); 
        //message = data1; 
       // LUB TUTAJ      
       Interlocked.Increment(ref this.totalMessages); 
       this.LastMessageOffset = data1.Offset; 
      } 

私はループを試しました。例えば

(int i=0; i<messages.Count(); i++) 

また、動作しません。

+0

あなたのメッセージがjson形式であるとみなすことによって、 jsonリストでメッセージをデシリアライズし、各メッセージをループすることができます。 – DSA

+0

私はあなたが達成しようとしていることは本当にわかりません。ループ内のメッセージを変更したいですか?そして何?どのような目的のために? –

+0

DSA、これをどのようにして表示することができますか? – Kamil

答えて

0

メッセージの顧客データを受信した後、それを天気にダウンロードし、天気のメッセージを置き換えてストリーム分析に引き続き送信します。

私はこれが可能ではないと思います。 EventProcessorを使用してメッセージを処理しています。それはイベントストリームになると終わりです。ここからは、永続化などのデータを使って何かを行うことができますが、イベントを変更/上書きできず、同じイベントハブストリームに戻すことはできません。

異なるコンシューマーグループを使用して複数のコンシューマーがストリームを読み込んでいるときに、どのように対処でき、コンシューマーグループがストリーム内の他のポジションにある場合はどうすれば対処できますか?

あなたはしかし、いくつかのオプションがあります。今、あなたのようなすべてのメッセージが何Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)ループでは

  1. をし、ストリーム解析を仕事に接続されている順番に、その後で別のイベント・ハブに送信します。
  2. Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)のようにすべてのメッセージをループして、気象データを取得し、それをAzure Blob Storageに保存します。

オプション2を調べると、ブロブコンテナをストリームアナリティクスジョブの入力として接続できます。

+0

ありがとうございます。しかし、別の問題があります。データを別のイベントハブに配送したいのですが、次のコードは機能しません。 – Kamil

0

ありがとうございます。しかし、別の問題があります。データを別のイベントハブに配送したいのですが、次のコードは機能しません。

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
      try 
     { 
      foreach (EventData message in messages) 
      { 
        string data = Encoding.UTF8.GetString(message.GetBytes()); 
        NewClient Client = JsonConvert.DeserializeObject<NewClient>(data); 
       if (Client.City != null && Client.Street != null) 
       {            
        string weatherdata = WeatherApiobject.sendEvent; 
        SenderEvent NewSenderEvent = new SenderEvent(); 
        NewSenderEvent.DataSender(weatherdata, ConstFile.WeatherEventHubName); 
        StartH(ConstFile.WeatherEventHubName).Wait(); 
       } 

       Interlocked.Increment(ref this.totalMessages); 
       this.LastMessageOffset = message.Offset; 
      } 

      if (this.IsClosed) 
      { 
       this.IsReceivedMessageAfterClose = true; 
      } 

      if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(1)) 
      { 
       lock (this) 
       { 
        this.checkpointStopWatch.Reset(); 
        return context.CheckpointAsync(); 
       } 
      } 
     } 

    private static async Task StartH(string eventhubname) 
    { 
     string eventProcessorHostName = "1"; 
     string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", ConstFile.storageAccountName, ConstFile.storageAccountKey); 
     EventProcessorHost host = new EventProcessorHost(
      eventProcessorHostName, 
      ConstFile.NewClientEventHubName, 
      ConstFile.ConsumerGroup, 
      ConstFile.eventHubConnectionString, 
      storageConnectionString, eventhubname.ToLowerInvariant()); 

     DemoEventProcessorFactory factory = new DemoEventProcessorFactory(eventProcessorHostName); 

     try 
     { 
      var options = new EventProcessorOptions(); 
      options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); }; 
      await host.RegisterEventProcessorFactoryAsync(factory); 
     } 
     catch (Exception exception) 
     { 
      Console.ForegroundColor = ConsoleColor.Red; 
      Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message); 
      Console.ResetColor(); 
     } 

     host.UnregisterEventProcessorAsync().Wait(); 
    } 
関連する問題