2016-10-17 9 views
0

私はeventhubの顧客を送って、天気などのサンプルデータにダウンロードして別のイベントハブを送信したいと思います。私のコードが正しく動作していません。エラーはありませんが、データはデータベースに送られません。どのようにeventhubから別のeventhubにメッセージを送信しますか?

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
     { 
       try 
      { 
       foreach (EventData message in messages) 
       { 
         string data = Encoding.UTF8.GetString(message.GetBytes()); 
         NewClient Client = JsonConvert.DeserializeObject<NewClient>(data); 
        if (Client.City != null && Client.Street != null) 
        { 

         GoogleGeoApi GeoClient = new GoogleGeoApi(); 
         GeoClient.SetCoordinates(Client.City, Client.Street); 
         WeatherApi WeatherApiobject = new WeatherApi(); 
         WeatherApiobject.GetJson(GeoClient.convertlat, GeoClient.convertlng); 
         string weatherdata = WeatherApiobject.sendEvent; 
         SenderEvent NewSenderEvent = new SenderEvent(); 
         NewSenderEvent.DataSender(weatherdata, ConstFile.WeatherEventHubName); 
         //StartH(ConstFile.WeatherEventHubName).Wait(); 
        } 

        Interlocked.Increment(ref this.totalMessages); 
        this.LastMessageOffset = message.Offset; 
       } 

       if (this.IsClosed) 
       { 
        this.IsReceivedMessageAfterClose = true; 
       } 

       if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(1)) 
       { 
        lock (this) 
        { 
         this.checkpointStopWatch.Reset(); 
         return context.CheckpointAsync(); 
        } 
       } 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("{0} > Event Hub Exception: {1}", DateTime.Now, ex.Message); 
      } 

      return Task.FromResult<object>(null); 
     } 

私は私の受信eventhubはこのようになっていることを追加します:あなたは非常に簡単にこれを行うには、ストリームAnalyticsを使用することができます https://code.msdn.microsoft.com/Service-Bus-Event-Hub-45f43fc3/view/SourceCode#content

+0

このクラスは 'SenderEvent'とは異なり、そのクラスは別の' EventHub'にデータを送信しますか?もしそうなら、そのコードを投稿してください。 –

+0

以下のコードを貼り付けます。 SenderEventは両方のメッセージ(クライアントと天気予報)で同じです – Kamil

+0

そして、アプリケーションは貼り付けられたコードの 'eventhubclient.Send(data1);'という行に例外なく入りますか?そして、どのプロセスが2番目のイベントハブを聞いていますか? –

答えて

0
public class SenderEvent 
    { 
     public void DataSender(string data, string eventhubname) 
     { 
      var eventhubclient = EventHubClient.CreateFromConnectionString(ConstFile.eventHubConnectionString, eventhubname);  
      EventData data1 = new EventData(Encoding.UTF8.GetBytes(data)); 
      eventhubclient.Send(data1); 
     } 
0

!最初のイベントハブはストリームアナリティクスのinputです。次に、ストリームに対してqueryを書くことができます([Input1]から選択* ...これですべてが得られます)。 outputストリームを別のイベントハブに戻すことができます。

0
private static async Task StartHost(string eventHubName) 
     { 
      string eventProcessorHostName = "1"; 
      string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", ConstFile.storageAccountName, ConstFile.storageAccountKey); 
      host = new EventProcessorHost(
       eventProcessorHostName, 
       eventHubName, 
       ConstFile.ConsumerGroup, 
       ConstFile.eventHubConnectionString, 
       storageConnectionString, eventHubName.ToLowerInvariant()); 

      factory = new DemoEventProcessorFactory(eventProcessorHostName); 

      try 
      { 
       var options = new EventProcessorOptions(); 
       options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); }; 
       await host.RegisterEventProcessorFactoryAsync(factory); 
      } 
      catch (Exception exception) 
      { 
       Console.ForegroundColor = ConsoleColor.Red; 
       Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message); 
       Console.ResetColor(); 
      } 
     } 
    } 
} 
関連する問題