2017-07-08 12 views

答えて

0

メッセージを受け取ったときには、トピックとパーティションと、それが来た場所からのオフセット(キーと値に加えて)が含まれている必要があります。 example hereから

:それは前の回答に加えて、各トピックのパーティション

consumer.OnPartitionEOF += (_, end) 
    => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" + 
      $" , next message will be at offset {end.Offset}"); 
1

の終わりに到達したときにも、イベントを取得

consumer.OnMessage += (_, msg) 
    => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " + 
     $"Offset: {msg.Offset} {msg.Value}"); 

、あなたが

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions) 
を使用することができます

特定のトピック/パーティションに対して、librdkafkaからポーリングされた最後のオフセットを返します

また、あなたはそれがカフカのクラスタにリクエストを送信します最新知らオフセット

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) 

を照会することができ、消費者


からのオフセットを犯した最新のために、同様のCommitted方法があります。コールがブロックされていて、適切なタイムアウトを設定してください。現在、一度に複数のパーティションで要求を送信することはできません。 あなたは

librdkafkaに内部状態を照会し、そしてINVALID_OFFSET(-1001)を返すことができ
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) 

もありどちらか、最後のオフセット知ら取得するか、それを使用することができますラグ

を計算します。データを処理することによって、それを使用していくつかの遅延を検出することができます。代わりに、消費者からのオフセット情報を検索する(位置と、この方法の結果との差)

0

私はこのような生産者からのトピックオフセット(highlow)を読み取ることができた(私は最初のメッセージを消費したくなかった):

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10)); 
関連する問題