2017-04-17 13 views
2
#!/bin/zsh 
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16') 
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092') 
topics=('t1' 't2' 't1_failed' 't2_failed') 


NORMAL=$(tput sgr0) 
GREEN=$(tput setaf 2; tput bold) 
YELLOW=$(tput setaf 3) 
RED=$(tput setaf 1) 

function red() { 
    echo -e "$RED$*$NORMAL" 
} 

function green() { 
    echo -e "$GREEN$*$NORMAL" 
} 

function yellow() { 
    echo -e "$YELLOW$*$NORMAL" 
} 



for topic in $topics; do 
    yellow "Cleaning up messages in topic @ " $topic 
    yellow "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
done 

red "Waiting 120 seconds for messages to expire" 
sleep 120 

for topic in $topics; do 
    green "Restoring config of topic @ " $topic                 
    green "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms     
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
    $KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic 
done 

このスクリプトを実行すると、config.retention.msが100msに変更されたことがわかりますが、120秒遅れても、すべてのカフカトピックで同じメッセージが表示されます。Kafkaのretention.msはKafka 0.10.2で強制されていませんか?

メッセージをパージするにはどうすればよいですか?

おかげで、 ドミトリー

答えて

3

あなたは5分にlog.retention.check.interval.msデフォルトのを待つ必要があります。

5

受け入れられた回答よりも多少それがあります。 Kafkaはメッセージをファイルシステムのログファイルに保存します。これらのファイルにはロールオーバがあります(時間またはサイズによって構成されます)。ファイルが現在のファイルではなくなると、カフカはそのファイルに追加されなくなります。

今や楽しい部分です:カフカは個々のメッセージの有効期限はありません。そのファイル内のメッセージの最高のタイムスタンプがretention.msより古い場合、ログファイル全体が削除されます(圧縮されていないトピックの場合)。保持時間を指定すると、メッセージは以上(少なくとも)長くなる可能性がありますが、ロールオーバの設定やメッセージの量によってはるかに長い時間利用できる可能性があります。

古いKafkaバージョンでは、メッセージのタイムスタンプに基づいていませんが、ログファイルへの書き込みアクセス権があります。これを指摘してくれた@dawsawに感謝します。

+0

ここで説明する動作は、0.10.2より前のバージョンに適用されます。最後に変更された時間は使用されなくなりました。代わりに、時間インデックスが使用されます。 – dawsaw

+0

あなたはそれのためのリンクまたはKIPを持っているのですか?現在のドキュメントでは、「データは一度に1つのログセグメントだけ削除されます。ログマネージャは、プラグ可能な削除ポリシーによって、削除対象のファイルを選択できます。最後のN GBを保持していたポリシーも有用かもしれません。 (https://kafka.apache.org/documentation/#impl_deletes) – ftr

+0

0.10.1の注目すべき変更ですhttps://kafka.apache.org/documentation/#upgrade_10_1_breaking – dawsaw

関連する問題