私はKafka 0.10.0を使用しています。処理する前に、パーティション内のレコードのサイズを知りたい。カフカ0.10でトピック区画のオフセット範囲を見つける方法は?
バージョン0.9.0.1では、次のコードを使用して、パーティションのオフセットをlatest
とearliest
の間で変更しました。新しいバージョンでは、consumer#position
メソッドを取得するときに機能しません。
package org.apache.kafka.example.utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FindTopicRange {
private static Logger logger = LogManager.getLogger();
public FindTopicRange() {
// TODO Auto-generated constructor stub
}
public static Map<TopicPartition, Range<Long>> getOffsets(String topic) {
Map<TopicPartition, Range<Long>> partitionToRange = new HashMap<>();
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerConfigs())) {
List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
consumer.assign(partitions);
for (TopicPartition partition : partitions) {
consumer.seekToBeginning(Collections.singletonList(partition));
long earliestOffset = consumer.position(partition);
consumer.seekToEnd(Collections.singletonList(partition));
long latestOffset = consumer.position(partition);
partitionToRange.put(partition, Range.between(earliestOffset, latestOffset));
}
return partitionToRange;
} catch (Exception e) {
logger.error("Exception while getting offset range information for topic : {}", topic, e);
}
return partitionToRange;
}
private static Properties getConsumerConfigs() {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10240);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return configs;
}
public static void main(String[] args) {
System.out.println(getOffsets("hello"));
}
}
上記の呼び出しのためのスタックトレースを以下に示します。
"main" prio=10 tid=0x00007f1750013800 nid=0x443 runnable [0x00007f1756b88000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x00000007c21cba00> (a sun.nio.ch.Util$2)
- locked <0x00000007c21cb9f0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000007c21cb8d8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.kafka.common.network.Selector.select(Selector.java:454)
at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:324)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:306)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1405)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1196)
'KafkaConsumer'をインスタンス化せずに' latest'と 'earliest'オフセットをフェッチできる方法はありますか? –