2016-11-28 24 views
0

私はAzureでEventHubを設定しました。データを読み取るコンシューマグループもあります。それは何日もうまくいきました。突然、受信データに遅延があることがわかります(約3日間)。私はWindowsサービスを使用してサーバー内のデータを消費します。 1分あたり約500の着信メッセージがあります。誰かがこれを理解するために私を助けることができますか?EventHubからのデータの取得が遅れる

+0

あなたはどのようにイベントハブからデータを読んでいますか? IEventProcessorインスタンスを使用していますか? –

+0

@ PeterBonsはいPeter、IEventProcessorインスタンスを使用しています。 – vishnu

答えて

1

あまりにも遅いアイテムを処理している可能性があります。したがって、行うべき仕事は成長し、あなたは遅れを取るでしょう。あなたはこのようなコードを使用することができますストリームは、イベントのどこにいるかでいくつかの洞察を得るために

private void LogProgressRecord(PartitionContext context) 
{ 
    if (namespaceManager == null) 
     return; 

    var currentSeqNo = context.Lease.SequenceNumber; 
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber; 
    var delta = lastSeqNo - currentSeqNo; 

    logWriter.Write(
      $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})", 
      EventLevel.Informational); 
} 

namespaceManagerは次のように構築さ:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz"); 

私はこのロギングメソッドで呼び出しますCloseAsync方法:

public Task CloseAsync(PartitionContext context, CloseReason reason) 
{ 
    LogProgressRecord(context); 

    return Task.CompletedTask; 
} 

logWriterはいくつかのloggiですクラス私はBLOBストレージに情報を書き込んでいました。

は、それが今、パーティション3のために

最終処理さseqnrのようなメッセージが出力されますconsumergroup 'テレメトリ' 内の32823804の32780931(ラグ:42873)

がとても遅れが非常に高い、あなたは可能性があります長い間前に発生したイベントを処理すること。その場合は、プロセッサをスケールアップ/アウトする必要があります。

遅れがある場合は、指定された数のアイテムを処理するのにかかる時間を測定する必要があります。次に、パフォーマンスを最適化し、パフォーマンスが向上するかどうかを確認できます。我々はそれを好きでした:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events) 
{ 
     try 
     { 
      stopwatch.Restart(); 

      // process items here 

      stopwatch.Stop(); 

      await CheckPointAsync(context); 

      logWriter.Write(
       $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.", 
       EventLevel.Informational); 
     } 
} 
+0

あなたの貴重な時間のためにPeterに感謝します。私は、dataprocessorのどんな高価な操作もしません。私は、着信レコードをEFを使用してフラットテーブルに挿入するだけです。私はちょうど遅れをチェックしました、そしてそれは各パーティション(4つのパーティションを持っている)のための100000以上です。私のWindowsサービスの複数のインスタンスを実行し、遅れを補うことは可能ですか? – vishnu

+1

はい、ですが、データベースによっては、EF/DBが負荷を処理できないことがあります。毎秒500メッセージはあまりありません。あなたは操作の時間を測定すべきです。更新された回答をご覧ください。 –

+0

そうかもしれません。しかし、私は25のバッチ更新を持っています。25はそれのための少数ですが、私はチェックします。その間に、コンシューマ・グループのアクティブなリスナーの数に制限はありますか?私もいくつかのインスタンスを作る予定だったので。 VSでは、すでに実行中のコンシューマ・グループを数分間実行しようとすると致命的なエラーが発生します。 – vishnu

関連する問題