2017-02-28 22 views
1

私はいくつかのデータリプレイを実装したいと思っていました。そのためには、私は結合を使用しているので、カフカ保持ポリシーを使用する必要があります。 )。 P.P.私はこのようなトピックに自分のデータを送信しカフカバージョン0.10.1.1Kafka保持ポリシーが期待どおりに機能しない

を使用しています:

kafkaProducer.send(
        new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r) 
      ); 

そして、私はこのように私のトピックを作成します。

カフカ-トピックが--create - -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms = 172800000

上記の設定では、トピックの保存期間を48時間に設定する必要があります。

私はそれぞれのメッセージの実際の時刻を記録するためにTimestampExtractorを拡張します。

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor { 
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class); 
    @Override 
    public long extract(ConsumerRecord<Object, Object> consumerRecord) { 
     LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp())); 
     return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis(); 
    } 
} 

テストのために、私のトピックに4つのメッセージを送信しました。この4つのログメッセージが表示されます。

2017年2月28日午前10時23分39秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1488295086292ヒトreadble -Tue 2月28日10時18分06秒EST 2017
2017年2月28日10時24 :01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:14832.72億ヒトreadble -Sun 1月1日午前7時00分○○秒EST 2017
2017年2月28日10時26分11秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 14858.208億ヒトreadble -mon Jan 30 19:00:00 EST 2017
2017-02-28 10時27分22秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:1488295604411人間readble -Tue 2月28日10時26分44秒EST 2017

だから私は私のメッセージとして送らの二つが消去されます/削除見込ま​​に基づきます5分後(2月と3月は1月1日と1月30日のためメッセージが送信されます)。しかし、私は1時間私の話題を消費しようとしたし、私はすべての4つのメッセージを持って私の話題を消費するたびに。

カフカ-アブロ・コンソール・消費者--zookeeperはlocalhost:2181 --from-始まる--topic myTopic

マイカフカの設定は、このようなものです

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 

アム私は何か間違っているか、ここで何かが恋しいですか?

答えて

5

Kafkaは、ログセグメントを削除することによってその保持ポリシーを実装します。 Kafkaは、アクティブなセグメントを削除しません。アクティブなセグメントは、パーティションに送信された新しいメッセージを追加するセグメントです。カフカは古いセグメントのみを削除します。新しいメッセージがパーティションに送信され、そしていずれか

  • 新しいメッセージとアクティブなセグメントのサイズはlog.segment.bytes、又は
  • 最初のタイムスタンプを超える場合カフカは、古いセグメントに活性なセグメントをロールアクティブセグメントにおけるメッセージlog.roll.msよりも古い(デフォルトは7日です)

だからあなたの例では、あなたは、2月28日(火)10時18分06秒EST 2017後7日を待って、新しいメッセージを送信し、しなければなりません4つの初期メッセージはすべて削除されます。

+0

もし私が5分後に1970年(非常に古いメッセージ)のタイムスタンプを持つ2つのメッセージを送信したとしたらどうなるのでしょうか? – Am1rr3zA

関連する問題