2017-10-01 15 views
1

を開始していない:カフカログの圧縮は、私は以下の記述を使用してトピックを持っている

Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,min.compaction.lag.ms=86400000,cleanup.policy=compact 
    Topic: test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 

私のブローカーがこのトピックでは、重複キーの多くが存在し、その中870778件のメッセージを持っている真= log.cleaner.enable

を持っています(何千もの複製に達するものもある)。 Kafka docsによると、Kafkaはこれらの条件でログ圧縮を展開し、最新のメッセージ以外のすべてを特定のキーでプルーニングする必要があります。これは数週間後ではなく、数週間後には起こらない。ログ圧縮を開始するために私がここで欠けているのは何ですか?

ブローカー設定:

# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements. See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License. You may obtain a copy of the License at 
# 
# http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
# see kafka.server.KafkaConfig for additional details and defaults 

############################# Server Basics ############################# 

# The id of the broker. This must be set to a unique integer for each broker. 
broker.id=1 

############################# Socket Server Settings ############################# 

# The port the socket server listens on 
port=<port> 

# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
#host.name=localhost 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
#advertised.host.name=<hostname routable by clients> 

# The port to publish to ZooKeeper for clients to use. If this is not set, 
# it will publish the same port that the broker binds to. 
#advertised.port=<port accessible by clients> 

# The number of threads handling network requests 
num.network.threads=8 

# The number of threads doing disk I/O 
num.io.threads=8 

# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=1048576 

# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=1048576 

# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 

############################# Log Basics ############################# 

# A comma seperated list of directories under which to store log files 
log.dirs=<dir-path> 

# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=30 

############################# Log Flush Policy ############################# 

# Messages are immediately written to the filesystem but by default we only fsync() to sync 
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here: 
# 1. Durability: Unflushed data may be lost if you are not using replication. 
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or 
# every N messages (or both). This can be done globally and overridden on a per-topic basis. 

# The number of messages to accept before forcing a flush of data to disk 
log.flush.interval.messages=20000 
inter.broker.protocol.version=0.8.2.0 
log.message.format.version=0.8.2.0 

# The maximum amount of time a message can sit in a log before we force a flush 
log.flush.interval.ms=10000 
message.max.bytes=1000000 
auto.create.topics.enable=false 
log.index.interval.bytes=4096 
log.index.size.max.bytes=10485760 
log.flush.scheduler.interval.ms=2000 
log.roll.hours=24 
log.retention.check.interval.ms=300000 
log.segment.bytes=1073741824 
############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=24 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=536870912 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. 
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. 
log.cleaner.enable=true 

default.replication.factor=3 
num.replica.fetchers=4 
replica.fetch.max.bytes=1048576 
replica.fetch.wait.max.ms=2000 
replica.high.watermark.checkpoint.interval.ms=5000 
replica.socket.timeout.ms=60000 
replica.socket.receive.buffer.bytes=65536 
replica.lag.time.max.ms=30000 
replica.lag.max.messages=12000 

controller.socket.timeout.ms=60000 
controller.message.queue.size=20 

auto.leader.rebalance.enable=true 
leader.imbalance.per.broker.percentage=5 
leader.imbalance.check.interval.seconds=300 

############################# Zookeeper ############################# 

# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 

zookeeper.connect=<connection-string> 

# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=1000000 
#zk.sync.time.ms=2000 

kafka.metrics.reporters=com.airbnb.kafka.KafkaStatsdMetricsReporter 

# enable the reporter, (false) 
external.kafka.statsd.reporter.enabled=true 

# the host of the StatsD server (localhost) 
external.kafka.statsd.host=statsd 
# the port of the StatsD server (8995) 
external.kafka.statsd.port=<port> 

# a prefix for all metrics names (empty) 
external.kafka.statsd.metrics.prefix=<connection-string> 
+0

使用している完全なブローカー構成を渡すのに役立ちます。 – tchap

+0

@tchap added broker config – ethan123

答えて

2

あなたは、少なくとも2つのセグメントファイル(完成1つずつ実行)を持っている必要があります実行されている圧縮を持っています。

(あなたは二つの同じ性質を持っている理由を確認してください)

log.segment.bytes=1073741824 
log.segment.bytes=536870912 

ご使用の構成に応じて。

kafkaが圧縮を実行できるように、512Mbのファイルが1つ必要です。コンパクト化するトピックパーティション用に2つ以上のセグメントファイルがあることを確認してください。

+0

log.segment.bytesは問題!私の話題はこの閾値を超えていませんでした。なぜ2があるのか​​分からないが、それを指摘してくれてありがとう。ブローカ設定を編集することなく、トピック自体のしきい値を下げることができます。さらに、私たちのログには、別のバグが見つかりました。これは、圧縮を担当するスレッドが永遠に死ぬ原因となっていました - > https://blog.heroku.com/debugging-kafka-compacted-topics。とにかく、ありがとう! – ethan123

+0

ええ、これはまさに私が期待していたものです:-) – tchap

関連する問題