私はConfluent kafka C#clientを使用しています。これでトピックから消費された最新のオフセットを取得する方法は?Confluent kafka C#ライブラリのKafkaトピックから最新のオフセットを取得するには?
0
A
答えて
0
メッセージを受け取ったときには、トピックとパーティションと、それが来た場所からのオフセット(キーと値に加えて)が含まれている必要があります。 example hereから
:それは前の回答に加えて、各トピックのパーティション
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
$" , next message will be at offset {end.Offset}");
1
の終わりに到達したときにも、イベントを取得
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
$"Offset: {msg.Offset} {msg.Value}");
、あなたが
List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
を使用することができます
特定のトピック/パーティションに対して、librdkafkaからポーリングされた最後のオフセットを返します
また、あなたはそれがカフカのクラスタにリクエストを送信します最新知らオフセット
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
を照会することができ、消費者
からのオフセットを犯した最新のために、同様のCommitted
方法があります。コールがブロックされていて、適切なタイムアウトを設定してください。現在、一度に複数のパーティションで要求を送信することはできません。 あなたは
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
もありどちらか、最後のオフセット知ら取得するか、それを使用することができますラグ
を計算します。データを処理することによって、それを使用していくつかの遅延を検出することができます。代わりに、消費者からのオフセット情報を検索する(位置と、この方法の結果との差)
0
私はこのような生産者からのトピックオフセット(high
とlow
)を読み取ることができた(私は最初のメッセージを消費したくなかった):
var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));
関連する問題
- 1. kafkaトピックのパーティションの最新のオフセットを取得する方法は?
- 2. kafkaとSpark:API経由でトピックの最初のオフセットを取得
- 3. Kafka Connector - Kafka用のJMSSourceConnectorトピック
- 4. kafkaの最新のオフセットを取得する方法
- 5. kafkaトピックの最新のオフセットを取得するにはどうすればよいですか?
- 6. トピックにある最新のレコード/メッセージkafka
- 7. Faking Confluent .NET Kafkaコンシューマ
- 8. confluent-kafka python avro messages
- 9. Confluent Kafkaの再生メッセージ
- 10. confluent-kafkaのプロデューサ設定
- 11. kafkaはローカルデータベースからkafkaへの移行をオフセットします
- 12. Spark Streamingの最初からKafkaトピックからレコードを読み取る方法は?
- 13. Kafka Consumer最大オフセットから読み取るプロパティ
- 14. Apache Kafka:カフカから最新のメッセージを受け取るには?
- 15. Kafka 10 kafka-consumer-groups.shは、特定のグループについて単一のトピックのオフセットを記述できますか?
- 16. confluent-kafka avro with pipをインストールする
- 17. Filter-Interceptor-Kafkaのトピック
- 18. KafkaのトピックをMySqlにエクスポート
- 19. iOSからKafkaトピックにメッセージを送信
- 20. kafka-nodeライブラリを使用してkafkaサーバからトピックのリストを取得するにはどうすればよいですか?
- 21. Kafka-pythonはトピックのリストを取得します
- 22. Golang Kafkaのパーティションのコンシューマグループのオフセットを取得する方法10
- 23. confluent-kafka-python:パラメータなしのConsumer.commit()
- 24. Confluent Kafka&docker-compose - 実行中エラーの例
- 25. Javaアプリケーションからapache kafkaのトピックを購読するには?
- 26. イベントvsトピックApache Kafka
- 27. Spring KafkaクライアントがKerberos対応のKafka Brokerトピックからメッセージを取得できません。
- 28. C++コード内のkafkaからトピックを削除
- 29. KafkaエラーINVALID_ARGいいえconfluent-kafka-pythonを使用している場合のsasl.mechanisms
- 30. KStream(Confluent Platform)でオフセット値を取得するには