2017-09-05 2 views
1

カフカのトピックのTTLを7日に設定しました。私はKafkaからデータを取得していますが、DBに格納していますが、今私はKafkaから最後の5日間のメッセージを取り出してDBに格納しなければなりません 注:最後の5日間からKafkaに問題はありません。Javaを使用してカフカから最後の5日間のメッセージを取得する方法

+0

オフセット値を使用して消耗する必要があります。たとえば、最後の読み取りがオフセット100の場合は、オフセット101からそれを消費する必要があります。 –

+0

このオフセットの概念をJavaでどのように使用できますか、格納していないため、格納されたメッセージの最終オフセット値を知る方法を教えてください任意のオフセット値 – Sat

答えて

5

まずとき5日前のタイムスタンプのために、各パーティションのオフセットを取得するためにconsumer.offsetsForTimes()を呼び出し次に、あなたのトピックのためのパーティション

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

を取得するためにconsumer.partitionsFor()メソッドを呼び出します最後のメッセージは正常に処理されました。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

その後、通常どおりにその時点でのオフセット現在の消費者を配置し、(ポーリングを呼び出すために継続)とプロセスメッセージする)(consumer.seekを呼び出します。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

1

前のニースに答えるために、私はあなたのトピックのためのパーティションを取得するためにその呼び出しpartitionsForメソッドを追加し、@Hansが言ったように行うだろう。

+1

ありがとうございます。適切な第一歩を含めるように答えを更新しました。 –

関連する問題