0
私はKafka-StreamコンシューマでKafkaのレコードを読みたいと思っています。例えば1分ごとに?Kafka-Streamを使ってKafka-Streamを使ってレコードを読み取る方法
私はKafka-StreamコンシューマでKafkaのレコードを読みたいと思っています。例えば1分ごとに?Kafka-Streamを使ってKafka-Streamを使ってレコードを読み取る方法
私はあなたの質問を正しく理解しているかどうかわかりません。しかし、私は複数のアプローチがあると思います(あなたが実際に何を達成したいのかは、質問からは不明確です)。
KafkaConsumer
を使用したい場合は、メッセージを読んで、タイムスタンプをチェックして、間隔でデータをチャンクすることができます。
私は、カフカから1分ごとにデータを取得するオプションがあることを示しています(例:10:01に10:00から10:01までのすべてのレコードを読む、10:02ですべて読む実行時に新しいレコードを取得するのではなく、10時01分から10時02分までのレコードなど) 処理が完了するまでメモリ内のデータを保持するのではなく、指定された間隔ごとに処理するためにデータを読み取る必要があります。 – user7365161
Kafkaはプルベースなので、これは組み込みのサポートはありません。提案されたアプローチの1つを使用して、このロジックをクライアントに配置する必要があります。あなたが正しくコメントしていると分かっている場合は、poll()を開始する前に現在のend-of-logオフセットを取得するために、approach(3)を組み合わせて使用し、得られたオフセットにコンシューマメッセージのみを追加するあなたが消費を開始した後) –