#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')
NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)
function red() {
echo -e "$RED$*$NORMAL"
}
function green() {
echo -e "$GREEN$*$NORMAL"
}
function yellow() {
echo -e "$YELLOW$*$NORMAL"
}
for topic in $topics; do
yellow "Cleaning up messages in topic @ " $topic
yellow "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done
red "Waiting 120 seconds for messages to expire"
sleep 120
for topic in $topics; do
green "Restoring config of topic @ " $topic
green "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
$KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done
このスクリプトを実行すると、config.retention.msが100msに変更されたことがわかりますが、120秒遅れても、すべてのカフカトピックで同じメッセージが表示されます。Kafkaのretention.msはKafka 0.10.2で強制されていませんか?
メッセージをパージするにはどうすればよいですか?
おかげで、 ドミトリー
ここで説明する動作は、0.10.2より前のバージョンに適用されます。最後に変更された時間は使用されなくなりました。代わりに、時間インデックスが使用されます。 – dawsaw
あなたはそれのためのリンクまたはKIPを持っているのですか?現在のドキュメントでは、「データは一度に1つのログセグメントだけ削除されます。ログマネージャは、プラグ可能な削除ポリシーによって、削除対象のファイルを選択できます。最後のN GBを保持していたポリシーも有用かもしれません。 (https://kafka.apache.org/documentation/#impl_deletes) – ftr
0.10.1の注目すべき変更ですhttps://kafka.apache.org/documentation/#upgrade_10_1_breaking – dawsaw