私はいくつかのデータリプレイを実装したいと思っていました。そのためには、私は結合を使用しているので、カフカ保持ポリシーを使用する必要があります。 )。 P.P.私はこのようなトピックに自分のデータを送信しカフカバージョン0.10.1.1Kafka保持ポリシーが期待どおりに機能しない
を使用しています:
kafkaProducer.send(
new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
);
そして、私はこのように私のトピックを作成します。
カフカ-トピックが--create - -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms = 172800000
上記の設定では、トピックの保存期間を48時間に設定する必要があります。
私はそれぞれのメッセージの実際の時刻を記録するためにTimestampExtractor
を拡張します。
public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord) {
LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
}
}
テストのために、私のトピックに4つのメッセージを送信しました。この4つのログメッセージが表示されます。
2017年2月28日午前10時23分39秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1488295086292ヒトreadble -Tue 2月28日10時18分06秒EST 2017
2017年2月28日10時24 :01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:14832.72億ヒトreadble -Sun 1月1日午前7時00分○○秒EST 2017
2017年2月28日10時26分11秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 14858.208億ヒトreadble -mon Jan 30 19:00:00 EST 2017
2017-02-28 10時27分22秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:1488295604411人間readble -Tue 2月28日10時26分44秒EST 2017
だから私は私のメッセージとして送らの二つが消去されます/削除見込まに基づきます5分後(2月と3月は1月1日と1月30日のためメッセージが送信されます)。しかし、私は1時間私の話題を消費しようとしたし、私はすべての4つのメッセージを持って私の話題を消費するたびに。
マイカフカの設定は、このようなものですカフカ-アブロ・コンソール・消費者--zookeeperはlocalhost:2181 --from-始まる--topic myTopic
:
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
アム私は何か間違っているか、ここで何かが恋しいですか?
もし私が5分後に1970年(非常に古いメッセージ)のタイムスタンプを持つ2つのメッセージを送信したとしたらどうなるのでしょうか? – Am1rr3zA